From 13c503b73b4ecc8ce4a6e344f9ac15202985d686 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期二, 20 七月 2021 19:48:58 +0800 Subject: [PATCH] fix memory leak --- src/robust.h | 98 +++++++------------------------------------------ 1 files changed, 14 insertions(+), 84 deletions(-) diff --git a/src/robust.h b/src/robust.h index 1bbe8fc..1a3f430 100644 --- a/src/robust.h +++ b/src/robust.h @@ -23,93 +23,18 @@ #include "log.h" #include <atomic> #include <chrono> -#include <memory> -#include <mutex> -#include <string> -#include <sys/file.h> -#include <sys/ipc.h> -#include <sys/sem.h> -#include <sys/stat.h> -#include <sys/types.h> -#include <thread> #include <unistd.h> namespace robust { -using namespace std::chrono; -using namespace std::chrono_literals; - -template <unsigned PowerSize = 4, class Int = int64_t> -class AtomicQueue +// atomic queue, length is 1. +// lowest bit is used for data flag, 63 bit for data. +class AtomicQ63 { public: - typedef uint32_t size_type; - typedef Int Data; - typedef std::atomic<Data> AData; - static_assert(sizeof(Data) == sizeof(AData)); - enum { - power = PowerSize, - capacity = (1 << power), - mask = capacity - 1, - }; - - AtomicQueue() { memset(this, 0, sizeof(*this)); } - size_type head() const { return head_.load(); } - size_type tail() const { return tail_.load(); } - bool push(const Data d, bool try_more = false) - { - bool r = false; - size_type i = 0; - do { - auto pos = tail(); - if (tail_.compare_exchange_strong(pos, Next(pos))) { - auto cur = buf[pos].load(); - r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d)); - } - } while (try_more && !r && ++i < capacity); - return r; - } - bool pop(Data &d, bool try_more = false) - { - bool r = false; - Data cur; - size_type i = 0; - do { - auto pos = head(); - if (head_.compare_exchange_strong(pos, Next(pos))) { - cur = buf[pos].load(); - r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0); - } - } while (try_more && !r && ++i < capacity); - if (r) { d = Dec(cur); } - return r; - } - -private: - static_assert(std::is_integral<Data>::value, "Data must be integral type!"); - static_assert(std::is_signed<Data>::value, "Data must be signed type!"); - static_assert(PowerSize < 10, "RobustQ63 max size is 2^10!"); - - static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok. - static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok. - static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok. - static size_type Next(const size_type index) { return (index + 1) & mask; } - - std::atomic<size_type> head_; - std::atomic<size_type> tail_; - AData buf[capacity]; -}; - -template <class Int> -class AtomicQueue<0, Int> -{ - typedef Int Data; - typedef std::atomic<Data> AData; - static_assert(sizeof(Data) == sizeof(AData)); - -public: - AtomicQueue() { memset(this, 0, sizeof(*this)); } + typedef int64_t Data; + AtomicQ63() { memset(this, 0, sizeof(*this)); } bool push(const Data d, bool try_more = false) { auto cur = buf.load(); @@ -122,16 +47,19 @@ if (r) { d = Dec(cur); } return r; } - uint32_t head() const { return 0; } - uint32_t tail() const { return 0; } private: static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok. static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok. static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok. + + typedef std::atomic<Data> AData; + static_assert(sizeof(Data) == sizeof(AData)); + AData buf; }; +// atomic request-reply process, one cycle a time. class AtomicReqRep { public: @@ -139,6 +67,8 @@ typedef std::function<Data(const Data)> Handler; bool ClientRequest(const Data request, Data &reply); bool ServerProcess(Handler onReq); + AtomicReqRep() : + data_(0), timestamp_(now()) {} private: enum State { @@ -149,9 +79,9 @@ static int GetState(Data d) { return d & MaskBits(3); } static Data Encode(Data d, State st) { return (d << 3) | st; } static Data Decode(Data d) { return d >> 3; } - static void yield() { std::this_thread::sleep_for(10us); } + typedef std::chrono::steady_clock steady_clock; typedef steady_clock::duration Duration; - Duration now() { return steady_clock::now().time_since_epoch(); } + static Duration now() { return steady_clock::now().time_since_epoch(); } bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); } std::atomic<Data> data_; -- Gitblit v1.8.0