| | |
| | | Lock &l_; |
| | | }; |
| | | |
| | | template <class D, class Alloc = std::allocator<D>> |
| | | class CircularBuffer |
| | | { |
| | | typedef uint32_t size_type; |
| | | typedef uint32_t count_type; |
| | | typedef uint64_t meta_type; |
| | | static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; } |
| | | static count_type Count(meta_type meta) { return meta >> 32; } |
| | | static meta_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; } |
| | | |
| | | public: |
| | | typedef D Data; |
| | | |
| | | CircularBuffer(const size_type cap) : |
| | | CircularBuffer(cap, Alloc()) {} |
| | | CircularBuffer(const size_type cap, Alloc const &al) : |
| | | capacity_(cap + 1), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(capacity_)) |
| | | { |
| | | if (!buf) { |
| | | throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!"); |
| | | } else { |
| | | memset(&buf[0], 0, sizeof(D) * capacity_); |
| | | } |
| | | } |
| | | ~CircularBuffer() { al_.deallocate(buf, capacity_); } |
| | | |
| | | bool push_back(const Data d) |
| | | { |
| | | auto old = mtail(); |
| | | auto pos = Pos(old); |
| | | auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0); |
| | | if (!full) { |
| | | buf[pos] = d; |
| | | return mtail_.compare_exchange_strong(old, next(old)); |
| | | } |
| | | return false; |
| | | } |
| | | bool pop_front(Data &d) |
| | | { |
| | | auto old = mhead(); |
| | | auto pos = Pos(old); |
| | | if (!(pos == tail())) { |
| | | d = buf[pos]; |
| | | return mhead_.compare_exchange_strong(old, next(old)); |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | private: |
| | | CircularBuffer(const CircularBuffer &); |
| | | CircularBuffer(CircularBuffer &&); |
| | | CircularBuffer &operator=(const CircularBuffer &) = delete; |
| | | CircularBuffer &operator=(CircularBuffer &&) = delete; |
| | | |
| | | meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); } |
| | | size_type head() const { return Pos(mhead()); } |
| | | size_type tail() const { return Pos(mtail()); } |
| | | meta_type mhead() const { return mhead_.load(); } |
| | | meta_type mtail() const { return mtail_.load(); } |
| | | // data |
| | | const size_type capacity_; |
| | | std::atomic<meta_type> mhead_; |
| | | std::atomic<meta_type> mtail_; |
| | | Alloc al_; |
| | | typename Alloc::pointer buf = nullptr; |
| | | }; |
| | | |
| | | template <unsigned PowerSize = 4, class Int = int64_t> |
| | | class AtomicQueue |
| | | { |
| | |
| | | AData buf; |
| | | }; |
| | | |
| | | class AtomicReqRep |
| | | { |
| | | public: |
| | | typedef int64_t Data; |
| | | typedef std::function<Data(const Data)> Handler; |
| | | bool ClientRequest(const Data request, Data &reply) |
| | | { |
| | | auto end_time = now() + 3s; |
| | | do { |
| | | Data cur = data_.load(); |
| | | if (GetState(cur) == eStateFree && |
| | | DataCas(cur, Encode(request, eStateRequest))) { |
| | | do { |
| | | yield(); |
| | | cur = data_.load(); |
| | | if (GetState(cur) == eStateReply) { |
| | | DataCas(cur, Encode(0, eStateFree)); |
| | | reply = Decode(cur); |
| | | return true; |
| | | } |
| | | } while (now() < end_time); |
| | | } |
| | | yield(); |
| | | } while (now() < end_time); |
| | | return false; |
| | | } |
| | | |
| | | bool ServerProcess(Handler onReq) |
| | | { |
| | | Data cur = data_.load(); |
| | | switch (GetState(cur)) { |
| | | case eStateRequest: |
| | | if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) { |
| | | timestamp_ = now(); |
| | | return true; |
| | | } |
| | | break; |
| | | case eStateReply: |
| | | if (timestamp_.load() + 3s < now()) { |
| | | DataCas(cur, Encode(0, eStateFree)); |
| | | } |
| | | break; |
| | | case eStateFree: |
| | | default: break; |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | private: |
| | | enum State { |
| | | eStateFree, |
| | | eStateRequest, |
| | | eStateReply |
| | | }; |
| | | static int GetState(Data d) { return d & MaskBits(3); } |
| | | static Data Encode(Data d, State st) { return (d << 3) | st; } |
| | | static Data Decode(Data d) { return d >> 3; } |
| | | static void yield() { QuickSleep(); } |
| | | typedef steady_clock::duration Duration; |
| | | Duration now() { return steady_clock::now().time_since_epoch(); } |
| | | |
| | | bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); } |
| | | std::atomic<Data> data_; |
| | | std::atomic<Duration> timestamp_; |
| | | }; |
| | | |
| | | } // namespace robust |
| | | #endif // end of include guard: ROBUST_Q31RCWYU |