From 3788226ee9332945e90066b58f2b85026c2a0460 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 18 五月 2021 10:56:32 +0800 Subject: [PATCH] change node init, no shm lock any more. --- src/robust.h | 134 ++++++++++++++++++++++---------------------- 1 files changed, 66 insertions(+), 68 deletions(-) diff --git a/src/robust.h b/src/robust.h index c70e2fe..b3b99c4 100644 --- a/src/robust.h +++ b/src/robust.h @@ -177,74 +177,6 @@ Lock &l_; }; -template <class D, class Alloc = std::allocator<D>> -class CircularBuffer -{ - typedef uint32_t size_type; - typedef uint32_t count_type; - typedef uint64_t meta_type; - static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; } - static count_type Count(meta_type meta) { return meta >> 32; } - static meta_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; } - -public: - typedef D Data; - - CircularBuffer(const size_type cap) : - CircularBuffer(cap, Alloc()) {} - CircularBuffer(const size_type cap, Alloc const &al) : - capacity_(cap + 1), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(capacity_)) - { - if (!buf) { - throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!"); - } else { - memset(&buf[0], 0, sizeof(D) * capacity_); - } - } - ~CircularBuffer() { al_.deallocate(buf, capacity_); } - - bool push_back(const Data d) - { - auto old = mtail(); - auto pos = Pos(old); - auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0); - if (!full) { - buf[pos] = d; - return mtail_.compare_exchange_strong(old, next(old)); - } - return false; - } - bool pop_front(Data &d) - { - auto old = mhead(); - auto pos = Pos(old); - if (!(pos == tail())) { - d = buf[pos]; - return mhead_.compare_exchange_strong(old, next(old)); - } else { - return false; - } - } - -private: - CircularBuffer(const CircularBuffer &); - CircularBuffer(CircularBuffer &&); - CircularBuffer &operator=(const CircularBuffer &) = delete; - CircularBuffer &operator=(CircularBuffer &&) = delete; - - meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); } - size_type head() const { return Pos(mhead()); } - size_type tail() const { return Pos(mtail()); } - meta_type mhead() const { return mhead_.load(); } - meta_type mtail() const { return mtail_.load(); } - // data - const size_type capacity_; - std::atomic<meta_type> mhead_; - std::atomic<meta_type> mtail_; - Alloc al_; - typename Alloc::pointer buf = nullptr; -}; - template <unsigned PowerSize = 4, class Int = int64_t> class AtomicQueue { @@ -339,5 +271,71 @@ AData buf; }; +class AtomicReqRep +{ +public: + typedef int64_t Data; + typedef std::function<Data(const Data)> Handler; + bool ClientRequest(const Data request, Data &reply) + { + auto end_time = now() + 3s; + do { + Data cur = data_.load(); + if (GetState(cur) == eStateFree && + DataCas(cur, Encode(request, eStateRequest))) { + do { + yield(); + cur = data_.load(); + if (GetState(cur) == eStateReply) { + DataCas(cur, Encode(0, eStateFree)); + reply = Decode(cur); + return true; + } + } while (now() < end_time); + } + yield(); + } while (now() < end_time); + return false; + } + + bool ServerProcess(Handler onReq) + { + Data cur = data_.load(); + switch (GetState(cur)) { + case eStateRequest: + if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) { + timestamp_ = now(); + return true; + } + break; + case eStateReply: + if (timestamp_.load() + 3s < now()) { + DataCas(cur, Encode(0, eStateFree)); + } + break; + case eStateFree: + default: break; + } + return false; + } + +private: + enum State { + eStateFree, + eStateRequest, + eStateReply + }; + static int GetState(Data d) { return d & MaskBits(3); } + static Data Encode(Data d, State st) { return (d << 3) | st; } + static Data Decode(Data d) { return d >> 3; } + static void yield() { QuickSleep(); } + typedef steady_clock::duration Duration; + Duration now() { return steady_clock::now().time_since_epoch(); } + + bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); } + std::atomic<Data> data_; + std::atomic<Duration> timestamp_; +}; + } // namespace robust #endif // end of include guard: ROBUST_Q31RCWYU -- Gitblit v1.8.0