From 330f78f3334bcdcdb4cc2ab2dbf66604e0224d71 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 21 五月 2021 16:21:45 +0800
Subject: [PATCH] Merge branch 'master' of http://192.168.5.5:10010/r/valib/bhshmq
---
src/robust.h | 134 ++++++++++++++++++++++----------------------
1 files changed, 66 insertions(+), 68 deletions(-)
diff --git a/src/robust.h b/src/robust.h
index c70e2fe..b3b99c4 100644
--- a/src/robust.h
+++ b/src/robust.h
@@ -177,74 +177,6 @@
Lock &l_;
};
-template <class D, class Alloc = std::allocator<D>>
-class CircularBuffer
-{
- typedef uint32_t size_type;
- typedef uint32_t count_type;
- typedef uint64_t meta_type;
- static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; }
- static count_type Count(meta_type meta) { return meta >> 32; }
- static meta_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; }
-
-public:
- typedef D Data;
-
- CircularBuffer(const size_type cap) :
- CircularBuffer(cap, Alloc()) {}
- CircularBuffer(const size_type cap, Alloc const &al) :
- capacity_(cap + 1), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(capacity_))
- {
- if (!buf) {
- throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!");
- } else {
- memset(&buf[0], 0, sizeof(D) * capacity_);
- }
- }
- ~CircularBuffer() { al_.deallocate(buf, capacity_); }
-
- bool push_back(const Data d)
- {
- auto old = mtail();
- auto pos = Pos(old);
- auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0);
- if (!full) {
- buf[pos] = d;
- return mtail_.compare_exchange_strong(old, next(old));
- }
- return false;
- }
- bool pop_front(Data &d)
- {
- auto old = mhead();
- auto pos = Pos(old);
- if (!(pos == tail())) {
- d = buf[pos];
- return mhead_.compare_exchange_strong(old, next(old));
- } else {
- return false;
- }
- }
-
-private:
- CircularBuffer(const CircularBuffer &);
- CircularBuffer(CircularBuffer &&);
- CircularBuffer &operator=(const CircularBuffer &) = delete;
- CircularBuffer &operator=(CircularBuffer &&) = delete;
-
- meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); }
- size_type head() const { return Pos(mhead()); }
- size_type tail() const { return Pos(mtail()); }
- meta_type mhead() const { return mhead_.load(); }
- meta_type mtail() const { return mtail_.load(); }
- // data
- const size_type capacity_;
- std::atomic<meta_type> mhead_;
- std::atomic<meta_type> mtail_;
- Alloc al_;
- typename Alloc::pointer buf = nullptr;
-};
-
template <unsigned PowerSize = 4, class Int = int64_t>
class AtomicQueue
{
@@ -339,5 +271,71 @@
AData buf;
};
+class AtomicReqRep
+{
+public:
+ typedef int64_t Data;
+ typedef std::function<Data(const Data)> Handler;
+ bool ClientRequest(const Data request, Data &reply)
+ {
+ auto end_time = now() + 3s;
+ do {
+ Data cur = data_.load();
+ if (GetState(cur) == eStateFree &&
+ DataCas(cur, Encode(request, eStateRequest))) {
+ do {
+ yield();
+ cur = data_.load();
+ if (GetState(cur) == eStateReply) {
+ DataCas(cur, Encode(0, eStateFree));
+ reply = Decode(cur);
+ return true;
+ }
+ } while (now() < end_time);
+ }
+ yield();
+ } while (now() < end_time);
+ return false;
+ }
+
+ bool ServerProcess(Handler onReq)
+ {
+ Data cur = data_.load();
+ switch (GetState(cur)) {
+ case eStateRequest:
+ if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) {
+ timestamp_ = now();
+ return true;
+ }
+ break;
+ case eStateReply:
+ if (timestamp_.load() + 3s < now()) {
+ DataCas(cur, Encode(0, eStateFree));
+ }
+ break;
+ case eStateFree:
+ default: break;
+ }
+ return false;
+ }
+
+private:
+ enum State {
+ eStateFree,
+ eStateRequest,
+ eStateReply
+ };
+ static int GetState(Data d) { return d & MaskBits(3); }
+ static Data Encode(Data d, State st) { return (d << 3) | st; }
+ static Data Decode(Data d) { return d >> 3; }
+ static void yield() { QuickSleep(); }
+ typedef steady_clock::duration Duration;
+ Duration now() { return steady_clock::now().time_since_epoch(); }
+
+ bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); }
+ std::atomic<Data> data_;
+ std::atomic<Duration> timestamp_;
+};
+
} // namespace robust
#endif // end of include guard: ROBUST_Q31RCWYU
--
Gitblit v1.8.0