From 3788226ee9332945e90066b58f2b85026c2a0460 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 18 五月 2021 10:56:32 +0800
Subject: [PATCH] change node init, no shm lock any more.

---
 src/robust.h |  134 ++++++++++++++++++++++----------------------
 1 files changed, 66 insertions(+), 68 deletions(-)

diff --git a/src/robust.h b/src/robust.h
index c70e2fe..b3b99c4 100644
--- a/src/robust.h
+++ b/src/robust.h
@@ -177,74 +177,6 @@
 	Lock &l_;
 };
 
-template <class D, class Alloc = std::allocator<D>>
-class CircularBuffer
-{
-	typedef uint32_t size_type;
-	typedef uint32_t count_type;
-	typedef uint64_t meta_type;
-	static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; }
-	static count_type Count(meta_type meta) { return meta >> 32; }
-	static meta_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; }
-
-public:
-	typedef D Data;
-
-	CircularBuffer(const size_type cap) :
-	    CircularBuffer(cap, Alloc()) {}
-	CircularBuffer(const size_type cap, Alloc const &al) :
-	    capacity_(cap + 1), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(capacity_))
-	{
-		if (!buf) {
-			throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!");
-		} else {
-			memset(&buf[0], 0, sizeof(D) * capacity_);
-		}
-	}
-	~CircularBuffer() { al_.deallocate(buf, capacity_); }
-
-	bool push_back(const Data d)
-	{
-		auto old = mtail();
-		auto pos = Pos(old);
-		auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0);
-		if (!full) {
-			buf[pos] = d;
-			return mtail_.compare_exchange_strong(old, next(old));
-		}
-		return false;
-	}
-	bool pop_front(Data &d)
-	{
-		auto old = mhead();
-		auto pos = Pos(old);
-		if (!(pos == tail())) {
-			d = buf[pos];
-			return mhead_.compare_exchange_strong(old, next(old));
-		} else {
-			return false;
-		}
-	}
-
-private:
-	CircularBuffer(const CircularBuffer &);
-	CircularBuffer(CircularBuffer &&);
-	CircularBuffer &operator=(const CircularBuffer &) = delete;
-	CircularBuffer &operator=(CircularBuffer &&) = delete;
-
-	meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); }
-	size_type head() const { return Pos(mhead()); }
-	size_type tail() const { return Pos(mtail()); }
-	meta_type mhead() const { return mhead_.load(); }
-	meta_type mtail() const { return mtail_.load(); }
-	// data
-	const size_type capacity_;
-	std::atomic<meta_type> mhead_;
-	std::atomic<meta_type> mtail_;
-	Alloc al_;
-	typename Alloc::pointer buf = nullptr;
-};
-
 template <unsigned PowerSize = 4, class Int = int64_t>
 class AtomicQueue
 {
@@ -339,5 +271,71 @@
 	AData buf;
 };
 
+class AtomicReqRep
+{
+public:
+	typedef int64_t Data;
+	typedef std::function<Data(const Data)> Handler;
+	bool ClientRequest(const Data request, Data &reply)
+	{
+		auto end_time = now() + 3s;
+		do {
+			Data cur = data_.load();
+			if (GetState(cur) == eStateFree &&
+			    DataCas(cur, Encode(request, eStateRequest))) {
+				do {
+					yield();
+					cur = data_.load();
+					if (GetState(cur) == eStateReply) {
+						DataCas(cur, Encode(0, eStateFree));
+						reply = Decode(cur);
+						return true;
+					}
+				} while (now() < end_time);
+			}
+			yield();
+		} while (now() < end_time);
+		return false;
+	}
+
+	bool ServerProcess(Handler onReq)
+	{
+		Data cur = data_.load();
+		switch (GetState(cur)) {
+		case eStateRequest:
+			if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) {
+				timestamp_ = now();
+				return true;
+			}
+			break;
+		case eStateReply:
+			if (timestamp_.load() + 3s < now()) {
+				DataCas(cur, Encode(0, eStateFree));
+			}
+			break;
+		case eStateFree:
+		default: break;
+		}
+		return false;
+	}
+
+private:
+	enum State {
+		eStateFree,
+		eStateRequest,
+		eStateReply
+	};
+	static int GetState(Data d) { return d & MaskBits(3); }
+	static Data Encode(Data d, State st) { return (d << 3) | st; }
+	static Data Decode(Data d) { return d >> 3; }
+	static void yield() { QuickSleep(); }
+	typedef steady_clock::duration Duration;
+	Duration now() { return steady_clock::now().time_since_epoch(); }
+
+	bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); }
+	std::atomic<Data> data_;
+	std::atomic<Duration> timestamp_;
+};
+
 } // namespace robust
 #endif // end of include guard: ROBUST_Q31RCWYU

--
Gitblit v1.8.0