From 58d904a328c0d849769b483e901a0be9426b8209 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期二, 20 七月 2021 20:20:44 +0800
Subject: [PATCH] 调整Request C.BHFree的位置

---
 src/robust.h |  353 +++++++---------------------------------------------------
 1 files changed, 47 insertions(+), 306 deletions(-)

diff --git a/src/robust.h b/src/robust.h
index b7459ad..255aea4 100644
--- a/src/robust.h
+++ b/src/robust.h
@@ -19,333 +19,74 @@
 #ifndef ROBUST_Q31RCWYU
 #define ROBUST_Q31RCWYU
 
+#include "bh_util.h"
+#include "log.h"
+#include <string.h>
 #include <atomic>
 #include <chrono>
-#include <memory>
-#include <string.h>
-#include <string>
-#include <sys/types.h>
 #include <unistd.h>
 
 namespace robust
 {
 
-using namespace std::chrono;
-using namespace std::chrono_literals;
-constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; }
-
-void QuickSleep();
-
-class RobustReqRep
-{
-	typedef uint32_t State;
-	typedef std::string Msg;
-	typedef std::chrono::steady_clock::duration Duration;
-
-public:
-	enum ErrorCode {
-		eSuccess = 0,
-		eTimeout = EAGAIN,
-		eSizeError = EINVAL,
-	};
-
-	explicit RobustReqRep(const uint32_t max_len) :
-	    capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {}
-
-	void PutReady() { state_.store(eStateReady); }
-	bool Ready() const { return state_.load() == eStateReady; }
-	uint32_t capacity() const { return capacity_; }
-
-	int ClientRequest(const Msg &request, Msg &reply)
-	{
-		int r = ClientWriteRequest(request);
-		if (r == eSuccess) {
-			r = ClientReadReply(reply);
-		}
-		return r;
-	}
-	int ClientReadReply(Msg &reply);
-	int ClientWriteRequest(const Msg &request);
-	int ServerReadRequest(Msg &request);
-	int ServerWriteReply(const Msg &reply);
-
-private:
-	RobustReqRep(const RobustReqRep &);
-	RobustReqRep(RobustReqRep &&);
-	RobustReqRep &operator=(const RobustReqRep &) = delete;
-	RobustReqRep &operator=(RobustReqRep &&) = delete;
-
-	enum {
-		eStateInit = 0,
-		eStateReady = 0x19833891,
-		eClientWriteBegin,
-		eClientWriteEnd,
-		eServerReadBegin,
-		eServerReadEnd,
-		eServerWriteBegin,
-		eServerWriteEnd,
-		eClientReadBegin,
-		eClientReadEnd = eStateReady,
-	};
-	bool StateCas(State exp, State val);
-	void Write(const Msg &msg)
-	{
-		size_.store(msg.size());
-		memcpy(buf, msg.data(), msg.size());
-	}
-	void Read(Msg &msg) { msg.assign(buf, size_.load()); }
-
-	const uint32_t capacity_;
-	std::atomic<State> state_;
-	static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data.");
-	std::atomic<Duration> timestamp_;
-	std::atomic<int32_t> size_;
-	char buf[4];
-};
-
-class PidLocker
+// atomic queue, length is 1.
+// lowest bit is used for data flag, 63 bit for data.
+class AtomicQ63
 {
 public:
-	typedef int locker_t;
-	enum { eLockerBits = sizeof(locker_t) * 8 };
-	static locker_t this_locker()
+	typedef int64_t Data;
+	AtomicQ63() { memset(this, 0, sizeof(*this)); }
+	bool push(const Data d, bool try_more = false)
 	{
-		static locker_t val = getpid();
-		return val;
+		auto cur = buf.load();
+		return Empty(cur) && buf.compare_exchange_strong(cur, Enc(d));
 	}
-	static bool is_alive(locker_t locker) { return true; }
-};
-
-class RobustPidLocker
-{
-public:
-	typedef int locker_t;
-	enum { eLockerBits = sizeof(locker_t) * 8 };
-	static locker_t this_locker()
+	bool pop(Data &d, bool try_more = false)
 	{
-		static locker_t val = getpid();
-		return val;
-	}
-	static bool is_alive(locker_t locker)
-	{
-		char buf[64] = {0};
-		snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker);
-		return access(buf, F_OK) == 0;
-	}
-};
-
-class ExpiredLocker
-{
-public:
-	typedef int64_t locker_t;
-	enum { eLockerBits = 63 };
-	static locker_t this_locker() { return Now(); }
-	static bool is_alive(locker_t locker)
-	{
-		return Now() < locker + steady_clock::duration(10s).count();
-	}
-
-private:
-	static locker_t Now() { return steady_clock::now().time_since_epoch().count(); }
-};
-
-template <class LockerT>
-class CasMutex
-{
-	typedef typename LockerT::locker_t locker_t;
-	static inline locker_t this_locker() { return LockerT::this_locker(); }
-	static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); }
-	static const uint64_t kLockerMask = MaskBits(LockerT::eLockerBits);
-	static_assert(LockerT::eLockerBits < 64, "locker size must be smaller than 64 bit!");
-
-public:
-	CasMutex() :
-	    meta_(0) {}
-	int try_lock()
-	{
-		const auto t = steady_clock::now().time_since_epoch().count();
-		auto old = meta_.load();
-		int r = 0;
-		if (!Locked(old)) {
-			r = MetaCas(old, Meta(1, this_locker()));
-		} else if (!is_alive(Locker(old))) {
-			r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1;
-			if (r) {
-				printf("captured locker %ld -> %ld, locker = %d\n", int64_t(Locker(old)), int64_t(this_locker()), r);
-			}
-		}
-		return r;
-	}
-	int lock()
-	{
-		int r = 0;
-		do {
-			r = try_lock();
-		} while (r == 0);
-		return r;
-	}
-	void unlock()
-	{
-		auto old = meta_.load();
-		if (Locked(old) && Locker(old) == this_locker()) {
-			MetaCas(old, Meta(0, this_locker()));
-		}
-	}
-
-private:
-	std::atomic<uint64_t> meta_;
-	bool Locked(uint64_t meta) { return (meta >> 63) == 1; }
-	locker_t Locker(uint64_t meta) { return meta & kLockerMask; }
-	uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; }
-	bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
-};
-
-typedef CasMutex<RobustPidLocker> Mutex;
-
-template <class Lock>
-class Guard
-{
-public:
-	Guard(Lock &l) :
-	    l_(l) { l_.lock(); }
-	~Guard() { l_.unlock(); }
-
-private:
-	Guard(const Guard &);
-	Guard(Guard &&);
-	Lock &l_;
-};
-
-template <class D, class Alloc = std::allocator<D>>
-class CircularBuffer
-{
-	typedef uint32_t size_type;
-	typedef uint32_t count_type;
-	typedef uint64_t meta_type;
-	static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; }
-	static count_type Count(meta_type meta) { return meta >> 32; }
-	static meta_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; }
-
-public:
-	typedef D Data;
-
-	CircularBuffer(const size_type cap) :
-	    CircularBuffer(cap, Alloc()) {}
-	CircularBuffer(const size_type cap, Alloc const &al) :
-	    capacity_(cap + 1), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(capacity_))
-	{
-		if (!buf) {
-			throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!");
-		} else {
-			memset(&buf[0], 0, sizeof(D) * capacity_);
-		}
-	}
-	~CircularBuffer() { al_.deallocate(buf, capacity_); }
-
-	bool push_back(const Data d)
-	{
-		Guard<Mutex> guard(mutex_);
-		auto old = mtail();
-		auto pos = Pos(old);
-		auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0);
-		if (!full) {
-			buf[pos] = d;
-			return mtail_.compare_exchange_strong(old, next(old));
-		}
-		return false;
-	}
-	bool pop_front(Data &d)
-	{
-		Guard<Mutex> guard(mutex_);
-		auto old = mhead();
-		auto pos = Pos(old);
-		if (!(pos == tail())) {
-			d = buf[pos];
-			return mhead_.compare_exchange_strong(old, next(old));
-		} else {
-			return false;
-		}
-	}
-
-private:
-	CircularBuffer(const CircularBuffer &);
-	CircularBuffer(CircularBuffer &&);
-	CircularBuffer &operator=(const CircularBuffer &) = delete;
-	CircularBuffer &operator=(CircularBuffer &&) = delete;
-
-	meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); }
-	size_type head() const { return Pos(mhead()); }
-	size_type tail() const { return Pos(mtail()); }
-	meta_type mhead() const { return mhead_.load(); }
-	meta_type mtail() const { return mtail_.load(); }
-	// data
-	const size_type capacity_;
-	Mutex mutex_;
-	std::atomic<meta_type> mhead_;
-	std::atomic<meta_type> mtail_;
-	Alloc al_;
-	typename Alloc::pointer buf = nullptr;
-};
-
-template <unsigned PowerSize = 4, class Int = int64_t>
-class AtomicQueue
-{
-public:
-	typedef uint32_t size_type;
-	typedef Int Data;
-	typedef std::atomic<Data> AData;
-	static_assert(sizeof(Data) == sizeof(AData));
-	enum {
-		power = PowerSize,
-		capacity = (1 << power),
-		mask = capacity - 1,
-	};
-
-	AtomicQueue() { memset(this, 0, sizeof(*this)); }
-	size_type head() const { return head_.load(); }
-	size_type tail() const { return tail_.load(); }
-	bool like_empty() const { return head() == tail() && Empty(buf[head()]); }
-	bool like_full() const { return head() == tail() && !Empty(buf[head()]); }
-	bool push_back(const Data d, bool try_more = false)
-	{
-		bool r = false;
-		size_type i = 0;
-		do {
-			auto pos = tail();
-			auto cur = buf[pos].load();
-			r = Empty(cur) && buf[pos].compare_exchange_strong(cur, Enc(d));
-			tail_.compare_exchange_strong(pos, Next(pos));
-		} while (try_more && !r && ++i < capacity);
-		return r;
-	}
-	bool pop_front(Data &d, bool try_more = false)
-	{
-		bool r = false;
-		Data cur;
-		size_type i = 0;
-		do {
-			auto pos = head();
-			cur = buf[pos].load();
-			r = !Empty(cur) && buf[pos].compare_exchange_strong(cur, 0);
-			head_.compare_exchange_strong(pos, Next(pos));
-		} while (try_more && !r && ++i < capacity);
+		Data cur = buf.load();
+		bool r = !Empty(cur) && buf.compare_exchange_strong(cur, 0);
 		if (r) { d = Dec(cur); }
 		return r;
 	}
 
 private:
-	static_assert(std::is_integral<Data>::value, "Data must be integral type!");
-	static_assert(std::is_signed<Data>::value, "Data must be signed type!");
-	static_assert(PowerSize < 10, "RobustQ63 max size is 2^10!");
-
 	static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
 	static inline Data Enc(const Data d) { return (d << 1) | 1; }   // lowest bit 1 means data ok.
 	static inline Data Dec(const Data d) { return d >> 1; }         // lowest bit 1 means data ok.
-	static size_type Next(const size_type index) { return (index + 1) & mask; }
 
-	std::atomic<size_type> head_;
-	std::atomic<size_type> tail_;
-	AData buf[capacity];
+	typedef std::atomic<Data> AData;
+	// static_assert(sizeof(Data) == sizeof(AData));
+
+	AData buf;
+};
+
+// atomic request-reply process, one cycle a time.
+class AtomicReqRep
+{
+public:
+	typedef int64_t Data;
+	typedef std::function<Data(const Data)> Handler;
+	bool ClientRequest(const Data request, Data &reply);
+	bool ServerProcess(Handler onReq);
+	AtomicReqRep() :
+	    data_(0), timestamp_(now()) {}
+
+private:
+	enum State {
+		eStateFree,
+		eStateRequest,
+		eStateReply
+	};
+	static int GetState(Data d) { return d & MaskBits(3); }
+	static Data Encode(Data d, State st) { return (d << 3) | st; }
+	static Data Decode(Data d) { return d >> 3; }
+	typedef std::chrono::steady_clock steady_clock;
+	typedef steady_clock::duration Duration;
+	static Duration now() { return steady_clock::now().time_since_epoch(); }
+
+	bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); }
+	std::atomic<Data> data_;
+	std::atomic<Duration> timestamp_;
 };
 
 } // namespace robust

--
Gitblit v1.8.0