From 13c503b73b4ecc8ce4a6e344f9ac15202985d686 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期二, 20 七月 2021 19:48:58 +0800 Subject: [PATCH] fix memory leak --- src/robust.h | 281 +++----------------------------------------------------- 1 files changed, 16 insertions(+), 265 deletions(-) diff --git a/src/robust.h b/src/robust.h index b3b99c4..1a3f430 100644 --- a/src/robust.h +++ b/src/robust.h @@ -23,232 +23,18 @@ #include "log.h" #include <atomic> #include <chrono> -#include <memory> -#include <mutex> -#include <string> -#include <sys/file.h> -#include <sys/ipc.h> -#include <sys/sem.h> -#include <sys/stat.h> -#include <sys/types.h> #include <unistd.h> namespace robust { -using namespace std::chrono; -using namespace std::chrono_literals; -void QuickSleep(); - -class CasMutex -{ - typedef uint64_t locker_t; - static inline locker_t this_locker() { return pthread_self(); } - static const uint64_t kLockerMask = MaskBits(63); - -public: - CasMutex() : - meta_(0) {} - int try_lock() - { - auto old = meta_.load(); - int r = 0; - if (!Locked(old)) { - r = MetaCas(old, Meta(1, this_locker())); - } - return r; - } - int lock() - { - int r = 0; - do { - r = try_lock(); - } while (r == 0); - return r; - } - void unlock() - { - auto old = meta_.load(); - if (Locked(old) && Locker(old) == this_locker()) { - MetaCas(old, Meta(0, this_locker())); - } - } - -private: - std::atomic<uint64_t> meta_; - bool Locked(uint64_t meta) { return (meta >> 63) == 1; } - locker_t Locker(uint64_t meta) { return meta & kLockerMask; } - uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; } - bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); } -}; - -class NullMutex +// atomic queue, length is 1. +// lowest bit is used for data flag, 63 bit for data. +class AtomicQ63 { public: - template <class... T> - explicit NullMutex(T &&...t) {} // easy test. - bool try_lock() { return true; } - void lock() {} - void unlock() {} -}; - -// flock + mutex -class FMutex -{ -public: - typedef uint64_t id_t; - FMutex(id_t id) : - id_(id), fd_(Open(id_)), count_(0) - { - if (fd_ == -1) { throw "error create mutex!"; } - } - ~FMutex() { Close(fd_); } - bool try_lock(); - void lock(); - void unlock(); - -private: - static std::string GetPath(id_t id) - { - const std::string dir("/tmp/.bhome_mtx"); - mkdir(dir.c_str(), 0777); - return dir + "/fm_" + std::to_string(id); - } - static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); } - static int Close(int fd) { return close(fd); } - id_t id_; - int fd_; - std::mutex mtx_; - std::atomic<int32_t> count_; -}; - -union semun { - int val; /* Value for SETVAL */ - struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ - unsigned short *array; /* Array for GETALL, SETALL */ - struct seminfo *__buf; /* Buffer for IPC_INFO - (Linux-specific) */ -}; - -class SemMutex -{ -public: - SemMutex(key_t key) : - key_(key), sem_id_(semget(key, 1, 0666)) - { - if (sem_id_ == -1) { throw "error create semaphore."; } - } - ~SemMutex() {} - - bool try_lock() - { - sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT}; - return semop(sem_id_, &op, 1) == 0; - } - - void lock() - { - sembuf op = {0, -1, SEM_UNDO}; - semop(sem_id_, &op, 1); - } - - void unlock() - { - sembuf op = {0, 1, SEM_UNDO}; - semop(sem_id_, &op, 1); - } - -private: - key_t key_; - int sem_id_; -}; - -template <class Lock> -class Guard -{ -public: - Guard(Lock &l) : - l_(l) { l_.lock(); } - ~Guard() { l_.unlock(); } - -private: - Guard(const Guard &); - Guard(Guard &&); - Lock &l_; -}; - -template <unsigned PowerSize = 4, class Int = int64_t> -class AtomicQueue -{ -public: - typedef uint32_t size_type; - typedef Int Data; - typedef std::atomic<Data> AData; - static_assert(sizeof(Data) == sizeof(AData)); - enum { - power = PowerSize, - capacity = (1 << power), - mask = capacity - 1, - }; - - AtomicQueue() { memset(this, 0, sizeof(*this)); } - size_type head() const { return head_.load(); } - size_type tail() const { return tail_.load(); } - bool like_empty() const { return head() == tail() && Empty(buf[head()]); } - bool like_full() const { return head() == tail() && !Empty(buf[head()]); } - bool push(const Data d, bool try_more = false) - { - bool r = false; - size_type i = 0; - do { - auto pos = tail(); - if (tail_.compare_exchange_strong(pos, Next(pos))) { - auto cur = buf[pos].load(); - r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d)); - } - } while (try_more && !r && ++i < capacity); - return r; - } - bool pop(Data &d, bool try_more = false) - { - bool r = false; - Data cur; - size_type i = 0; - do { - auto pos = head(); - if (head_.compare_exchange_strong(pos, Next(pos))) { - cur = buf[pos].load(); - r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0); - } - } while (try_more && !r && ++i < capacity); - if (r) { d = Dec(cur); } - return r; - } - -private: - static_assert(std::is_integral<Data>::value, "Data must be integral type!"); - static_assert(std::is_signed<Data>::value, "Data must be signed type!"); - static_assert(PowerSize < 10, "RobustQ63 max size is 2^10!"); - - static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok. - static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok. - static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok. - static size_type Next(const size_type index) { return (index + 1) & mask; } - - std::atomic<size_type> head_; - std::atomic<size_type> tail_; - AData buf[capacity]; -}; - -template <class Int> -class AtomicQueue<0, Int> -{ - typedef Int Data; - typedef std::atomic<Data> AData; - static_assert(sizeof(Data) == sizeof(AData)); - -public: - AtomicQueue() { memset(this, 0, sizeof(*this)); } + typedef int64_t Data; + AtomicQ63() { memset(this, 0, sizeof(*this)); } bool push(const Data d, bool try_more = false) { auto cur = buf.load(); @@ -261,63 +47,28 @@ if (r) { d = Dec(cur); } return r; } - uint32_t head() const { return 0; } - uint32_t tail() const { return 0; } private: static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok. static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok. static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok. + + typedef std::atomic<Data> AData; + static_assert(sizeof(Data) == sizeof(AData)); + AData buf; }; +// atomic request-reply process, one cycle a time. class AtomicReqRep { public: typedef int64_t Data; typedef std::function<Data(const Data)> Handler; - bool ClientRequest(const Data request, Data &reply) - { - auto end_time = now() + 3s; - do { - Data cur = data_.load(); - if (GetState(cur) == eStateFree && - DataCas(cur, Encode(request, eStateRequest))) { - do { - yield(); - cur = data_.load(); - if (GetState(cur) == eStateReply) { - DataCas(cur, Encode(0, eStateFree)); - reply = Decode(cur); - return true; - } - } while (now() < end_time); - } - yield(); - } while (now() < end_time); - return false; - } - - bool ServerProcess(Handler onReq) - { - Data cur = data_.load(); - switch (GetState(cur)) { - case eStateRequest: - if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) { - timestamp_ = now(); - return true; - } - break; - case eStateReply: - if (timestamp_.load() + 3s < now()) { - DataCas(cur, Encode(0, eStateFree)); - } - break; - case eStateFree: - default: break; - } - return false; - } + bool ClientRequest(const Data request, Data &reply); + bool ServerProcess(Handler onReq); + AtomicReqRep() : + data_(0), timestamp_(now()) {} private: enum State { @@ -328,9 +79,9 @@ static int GetState(Data d) { return d & MaskBits(3); } static Data Encode(Data d, State st) { return (d << 3) | st; } static Data Decode(Data d) { return d >> 3; } - static void yield() { QuickSleep(); } + typedef std::chrono::steady_clock steady_clock; typedef steady_clock::duration Duration; - Duration now() { return steady_clock::now().time_since_epoch(); } + static Duration now() { return steady_clock::now().time_since_epoch(); } bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); } std::atomic<Data> data_; -- Gitblit v1.8.0