From 7ecd6323ffedbfef92c87c02b2a8680dd53b772c Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 06 五月 2021 19:37:50 +0800
Subject: [PATCH] rename atomic queue io function.

---
 src/robust.h |  196 +++++++++++++++++++++++++++++++++++++------------
 1 files changed, 148 insertions(+), 48 deletions(-)

diff --git a/src/robust.h b/src/robust.h
index 19e9bda..3334bc0 100644
--- a/src/robust.h
+++ b/src/robust.h
@@ -19,6 +19,7 @@
 #ifndef ROBUST_Q31RCWYU
 #define ROBUST_Q31RCWYU
 
+#include "log.h"
 #include <atomic>
 #include <chrono>
 #include <memory>
@@ -32,6 +33,7 @@
 
 using namespace std::chrono;
 using namespace std::chrono_literals;
+constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; }
 
 void QuickSleep();
 
@@ -102,20 +104,60 @@
 	char buf[4];
 };
 
-template <bool isRobust = false>
-class CasMutex
+class PidLocker
 {
-	static pid_t pid()
+public:
+	typedef int locker_t;
+	enum { eLockerBits = sizeof(locker_t) * 8 };
+	static locker_t this_locker()
 	{
-		static pid_t val = getpid();
+		static locker_t val = getpid();
 		return val;
 	}
-	static bool Killed(pid_t pid)
+	static bool is_alive(locker_t locker) { return true; }
+};
+
+class RobustPidLocker
+{
+public:
+	typedef int locker_t;
+	enum { eLockerBits = sizeof(locker_t) * 8 };
+	static locker_t this_locker()
+	{
+		static locker_t val = getpid();
+		return val;
+	}
+	static bool is_alive(locker_t locker)
 	{
 		char buf[64] = {0};
-		snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid);
-		return access(buf, F_OK) != 0;
+		snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker);
+		return access(buf, F_OK) == 0;
 	}
+};
+
+class ExpiredLocker
+{
+public:
+	typedef int64_t locker_t;
+	enum { eLockerBits = 63 };
+	static locker_t this_locker() { return Now(); }
+	static bool is_alive(locker_t locker)
+	{
+		return Now() < locker + steady_clock::duration(10s).count();
+	}
+
+private:
+	static locker_t Now() { return steady_clock::now().time_since_epoch().count(); }
+};
+
+template <class LockerT>
+class CasMutex
+{
+	typedef typename LockerT::locker_t locker_t;
+	static inline locker_t this_locker() { return LockerT::this_locker(); }
+	static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); }
+	static const uint64_t kLockerMask = MaskBits(LockerT::eLockerBits);
+	static_assert(LockerT::eLockerBits < 64, "locker size must be smaller than 64 bit!");
 
 public:
 	CasMutex() :
@@ -126,11 +168,11 @@
 		auto old = meta_.load();
 		int r = 0;
 		if (!Locked(old)) {
-			r = MetaCas(old, Meta(1, pid()));
-		} else if (isRobust && Killed(Pid(old))) {
-			r = static_cast<int>(MetaCas(old, Meta(1, pid()))) << 1;
+			r = MetaCas(old, Meta(1, this_locker()));
+		} else if (!is_alive(Locker(old))) {
+			r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1;
 			if (r) {
-				printf("captured pid %d -> %d, r = %d\n", Pid(old), pid(), r);
+				LOG_DEBUG() << "captured locker " << int64_t(Locker(old)) << " -> " << int64_t(this_locker()) << ", locker = " << r;
 			}
 		}
 		return r;
@@ -146,19 +188,20 @@
 	void unlock()
 	{
 		auto old = meta_.load();
-		if (Locked(old) && Pid(old) == pid()) {
-			MetaCas(old, Meta(0, pid()));
+		if (Locked(old) && Locker(old) == this_locker()) {
+			MetaCas(old, Meta(0, this_locker()));
 		}
 	}
 
 private:
 	std::atomic<uint64_t> meta_;
-	bool Locked(uint64_t meta) { return (meta >> 63) != 0; }
-	pid_t Pid(uint64_t meta) { return meta & ~(uint64_t(1) << 63); }
-	uint64_t Meta(uint64_t lk, pid_t pid) { return (lk << 63) | pid; }
+	bool Locked(uint64_t meta) { return (meta >> 63) == 1; }
+	locker_t Locker(uint64_t meta) { return meta & kLockerMask; }
+	uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; }
 	bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
-	static_assert(sizeof(pid_t) < sizeof(uint64_t));
 };
+
+typedef CasMutex<RobustPidLocker> Mutex;
 
 template <class Lock>
 class Guard
@@ -182,7 +225,7 @@
 	typedef uint64_t meta_type;
 	static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; }
 	static count_type Count(meta_type meta) { return meta >> 32; }
-	static size_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; }
+	static meta_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; }
 
 public:
 	typedef D Data;
@@ -190,66 +233,123 @@
 	CircularBuffer(const size_type cap) :
 	    CircularBuffer(cap, Alloc()) {}
 	CircularBuffer(const size_type cap, Alloc const &al) :
-	    state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap))
+	    capacity_(cap + 1), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(capacity_))
 	{
 		if (!buf) {
-			throw("error allocate buffer: out of mem!");
-		}
-	}
-	~CircularBuffer()
-	{
-		al_.deallocate(buf, capacity_);
-	}
-	size_type size() const { return (capacity_ + tail() - head()) % capacity_; }
-	bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; }
-	bool empty() const { return head() == tail(); }
-	bool push_back(Data d)
-	{
-		Guard<MutexT> guard(mutex_);
-		if (!full()) {
-			auto old = mtail();
-			buf[Pos(old)] = d;
-			return mtail_.compare_exchange_strong(old, next(old));
+			throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!");
 		} else {
-			return false;
+			memset(&buf[0], 0, sizeof(D) * capacity_);
 		}
+	}
+	~CircularBuffer() { al_.deallocate(buf, capacity_); }
+
+	bool push_back(const Data d)
+	{
+		Guard<Mutex> guard(mutex_);
+		auto old = mtail();
+		auto pos = Pos(old);
+		auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0);
+		if (!full) {
+			buf[pos] = d;
+			return mtail_.compare_exchange_strong(old, next(old));
+		}
+		return false;
 	}
 	bool pop_front(Data &d)
 	{
-		Guard<MutexT> guard(mutex_);
-		if (!empty()) {
-			auto old = mhead();
-			d = buf[Pos(old)];
+		Guard<Mutex> guard(mutex_);
+		auto old = mhead();
+		auto pos = Pos(old);
+		if (!(pos == tail())) {
+			d = buf[pos];
 			return mhead_.compare_exchange_strong(old, next(old));
 		} else {
 			return false;
 		}
 	}
-	bool Ready() const { return state_.load() == eStateReady; }
-	void PutReady() { state_.store(eStateReady); }
 
 private:
 	CircularBuffer(const CircularBuffer &);
 	CircularBuffer(CircularBuffer &&);
 	CircularBuffer &operator=(const CircularBuffer &) = delete;
 	CircularBuffer &operator=(CircularBuffer &&) = delete;
-	typedef CasMutex<true> MutexT;
-	// static_assert(sizeof(MutexT) == 16);
+
 	meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); }
 	size_type head() const { return Pos(mhead()); }
 	size_type tail() const { return Pos(mtail()); }
 	meta_type mhead() const { return mhead_.load(); }
 	meta_type mtail() const { return mtail_.load(); }
 	// data
-	enum { eStateReady = 0x19833891 };
-	std::atomic<uint32_t> state_;
 	const size_type capacity_;
-	MutexT mutex_;
+	Mutex mutex_;
 	std::atomic<meta_type> mhead_;
 	std::atomic<meta_type> mtail_;
 	Alloc al_;
 	typename Alloc::pointer buf = nullptr;
 };
 
+template <unsigned PowerSize = 4, class Int = int64_t>
+class AtomicQueue
+{
+public:
+	typedef uint32_t size_type;
+	typedef Int Data;
+	typedef std::atomic<Data> AData;
+	static_assert(sizeof(Data) == sizeof(AData));
+	enum {
+		power = PowerSize,
+		capacity = (1 << power),
+		mask = capacity - 1,
+	};
+
+	AtomicQueue() { memset(this, 0, sizeof(*this)); }
+	size_type head() const { return head_.load(); }
+	size_type tail() const { return tail_.load(); }
+	bool like_empty() const { return head() == tail() && Empty(buf[head()]); }
+	bool like_full() const { return head() == tail() && !Empty(buf[head()]); }
+	bool push(const Data d, bool try_more = false)
+	{
+		bool r = false;
+		size_type i = 0;
+		do {
+			auto pos = tail();
+			if (tail_.compare_exchange_strong(pos, Next(pos))) {
+				auto cur = buf[pos].load();
+				r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d));
+			}
+		} while (try_more && !r && ++i < capacity);
+		return r;
+	}
+	bool pop(Data &d, bool try_more = false)
+	{
+		bool r = false;
+		Data cur;
+		size_type i = 0;
+		do {
+			auto pos = head();
+			if (head_.compare_exchange_strong(pos, Next(pos))) {
+				cur = buf[pos].load();
+				r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0);
+			}
+		} while (try_more && !r && ++i < capacity);
+		if (r) { d = Dec(cur); }
+		return r;
+	}
+
+private:
+	static_assert(std::is_integral<Data>::value, "Data must be integral type!");
+	static_assert(std::is_signed<Data>::value, "Data must be signed type!");
+	static_assert(PowerSize < 10, "RobustQ63 max size is 2^10!");
+
+	static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
+	static inline Data Enc(const Data d) { return (d << 1) | 1; }   // lowest bit 1 means data ok.
+	static inline Data Dec(const Data d) { return d >> 1; }         // lowest bit 1 means data ok.
+	static size_type Next(const size_type index) { return (index + 1) & mask; }
+
+	std::atomic<size_type> head_;
+	std::atomic<size_type> tail_;
+	AData buf[capacity];
+};
+
 } // namespace robust
 #endif // end of include guard: ROBUST_Q31RCWYU

--
Gitblit v1.8.0