/* * ===================================================================================== * * Filename: robust.h * * Description: * * Version: 1.0 * Created: 2021年04月27日 10时04分29秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), lichao@aiotlink.com * Organization: * * ===================================================================================== */ #ifndef ROBUST_Q31RCWYU #define ROBUST_Q31RCWYU #include "bh_util.h" #include "log.h" #include #include #include namespace robust { /* template class AtomicQueue { public: typedef uint32_t size_type; typedef Int Data; typedef std::atomic AData; static_assert(sizeof(Data) == sizeof(AData)); enum { power = PowerSize, capacity = (1 << power), mask = capacity - 1, }; AtomicQueue() { memset(this, 0, sizeof(*this)); } size_type head() const { return head_.load(); } size_type tail() const { return tail_.load(); } bool push(const Data d, bool try_more = false) { bool r = false; size_type i = 0; do { auto pos = tail(); if (tail_.compare_exchange_strong(pos, Next(pos))) { auto cur = buf[pos].load(); r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d)); } } while (try_more && !r && ++i < capacity); return r; } bool pop(Data &d, bool try_more = false) { bool r = false; Data cur; size_type i = 0; do { auto pos = head(); if (head_.compare_exchange_strong(pos, Next(pos))) { cur = buf[pos].load(); r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0); } } while (try_more && !r && ++i < capacity); if (r) { d = Dec(cur); } return r; } private: static_assert(std::is_integral::value, "Data must be integral type!"); static_assert(std::is_signed::value, "Data must be signed type!"); static_assert(PowerSize < 10, "RobustQ63 max size is 2^10!"); static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok. static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok. static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok. static size_type Next(const size_type index) { return (index + 1) & mask; } std::atomic head_; std::atomic tail_; AData buf[capacity]; }; //*/ class AtomicQ63 { public: typedef int64_t Data; AtomicQ63() { memset(this, 0, sizeof(*this)); } bool push(const Data d, bool try_more = false) { auto cur = buf.load(); return Empty(cur) && buf.compare_exchange_strong(cur, Enc(d)); } bool pop(Data &d, bool try_more = false) { Data cur = buf.load(); bool r = !Empty(cur) && buf.compare_exchange_strong(cur, 0); if (r) { d = Dec(cur); } return r; } private: static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok. static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok. static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok. typedef std::atomic AData; static_assert(sizeof(Data) == sizeof(AData)); AData buf; }; class AtomicReqRep { public: typedef int64_t Data; typedef std::function Handler; bool ClientRequest(const Data request, Data &reply); bool ServerProcess(Handler onReq); private: enum State { eStateFree, eStateRequest, eStateReply }; static int GetState(Data d) { return d & MaskBits(3); } static Data Encode(Data d, State st) { return (d << 3) | st; } static Data Decode(Data d) { return d >> 3; } typedef std::chrono::steady_clock steady_clock; typedef steady_clock::duration Duration; Duration now() { return steady_clock::now().time_since_epoch(); } bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); } std::atomic data_; std::atomic timestamp_; }; } // namespace robust #endif // end of include guard: ROBUST_Q31RCWYU