From 28f06bc49a4d8d69f1ea2f767863b7921d12f155 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期六, 08 五月 2021 18:30:48 +0800
Subject: [PATCH] add robust FMutex, works fine; use boost circular.

---
 src/robust.h |  221 ++++++++++++++++++++++--------------------------------
 1 files changed, 91 insertions(+), 130 deletions(-)

diff --git a/src/robust.h b/src/robust.h
index 3334bc0..d2d94e9 100644
--- a/src/robust.h
+++ b/src/robust.h
@@ -23,8 +23,12 @@
 #include <atomic>
 #include <chrono>
 #include <memory>
-#include <string.h>
+#include <mutex>
 #include <string>
+#include <sys/file.h>
+#include <sys/ipc.h>
+#include <sys/sem.h>
+#include <sys/stat.h>
 #include <sys/types.h>
 #include <unistd.h>
 
@@ -37,143 +41,21 @@
 
 void QuickSleep();
 
-class RobustReqRep
-{
-	typedef uint32_t State;
-	typedef std::string Msg;
-	typedef std::chrono::steady_clock::duration Duration;
-
-public:
-	enum ErrorCode {
-		eSuccess = 0,
-		eTimeout = EAGAIN,
-		eSizeError = EINVAL,
-	};
-
-	explicit RobustReqRep(const uint32_t max_len) :
-	    capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {}
-
-	void PutReady() { state_.store(eStateReady); }
-	bool Ready() const { return state_.load() == eStateReady; }
-	uint32_t capacity() const { return capacity_; }
-
-	int ClientRequest(const Msg &request, Msg &reply)
-	{
-		int r = ClientWriteRequest(request);
-		if (r == eSuccess) {
-			r = ClientReadReply(reply);
-		}
-		return r;
-	}
-	int ClientReadReply(Msg &reply);
-	int ClientWriteRequest(const Msg &request);
-	int ServerReadRequest(Msg &request);
-	int ServerWriteReply(const Msg &reply);
-
-private:
-	RobustReqRep(const RobustReqRep &);
-	RobustReqRep(RobustReqRep &&);
-	RobustReqRep &operator=(const RobustReqRep &) = delete;
-	RobustReqRep &operator=(RobustReqRep &&) = delete;
-
-	enum {
-		eStateInit = 0,
-		eStateReady = 0x19833891,
-		eClientWriteBegin,
-		eClientWriteEnd,
-		eServerReadBegin,
-		eServerReadEnd,
-		eServerWriteBegin,
-		eServerWriteEnd,
-		eClientReadBegin,
-		eClientReadEnd = eStateReady,
-	};
-	bool StateCas(State exp, State val);
-	void Write(const Msg &msg)
-	{
-		size_.store(msg.size());
-		memcpy(buf, msg.data(), msg.size());
-	}
-	void Read(Msg &msg) { msg.assign(buf, size_.load()); }
-
-	const uint32_t capacity_;
-	std::atomic<State> state_;
-	static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data.");
-	std::atomic<Duration> timestamp_;
-	std::atomic<int32_t> size_;
-	char buf[4];
-};
-
-class PidLocker
-{
-public:
-	typedef int locker_t;
-	enum { eLockerBits = sizeof(locker_t) * 8 };
-	static locker_t this_locker()
-	{
-		static locker_t val = getpid();
-		return val;
-	}
-	static bool is_alive(locker_t locker) { return true; }
-};
-
-class RobustPidLocker
-{
-public:
-	typedef int locker_t;
-	enum { eLockerBits = sizeof(locker_t) * 8 };
-	static locker_t this_locker()
-	{
-		static locker_t val = getpid();
-		return val;
-	}
-	static bool is_alive(locker_t locker)
-	{
-		char buf[64] = {0};
-		snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker);
-		return access(buf, F_OK) == 0;
-	}
-};
-
-class ExpiredLocker
-{
-public:
-	typedef int64_t locker_t;
-	enum { eLockerBits = 63 };
-	static locker_t this_locker() { return Now(); }
-	static bool is_alive(locker_t locker)
-	{
-		return Now() < locker + steady_clock::duration(10s).count();
-	}
-
-private:
-	static locker_t Now() { return steady_clock::now().time_since_epoch().count(); }
-};
-
-template <class LockerT>
 class CasMutex
 {
-	typedef typename LockerT::locker_t locker_t;
-	static inline locker_t this_locker() { return LockerT::this_locker(); }
-	static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); }
-	static const uint64_t kLockerMask = MaskBits(LockerT::eLockerBits);
-	static_assert(LockerT::eLockerBits < 64, "locker size must be smaller than 64 bit!");
+	typedef uint64_t locker_t;
+	static inline locker_t this_locker() { return pthread_self(); }
+	static const uint64_t kLockerMask = MaskBits(63);
 
 public:
 	CasMutex() :
 	    meta_(0) {}
 	int try_lock()
 	{
-		const auto t = steady_clock::now().time_since_epoch().count();
 		auto old = meta_.load();
 		int r = 0;
 		if (!Locked(old)) {
 			r = MetaCas(old, Meta(1, this_locker()));
-		} else if (!is_alive(Locker(old))) {
-			r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1;
-			if (r) {
-				LOG_DEBUG() << "captured locker " << int64_t(Locker(old)) << " -> " << int64_t(this_locker()) << ", locker = " << r;
-			}
 		}
 		return r;
 	}
@@ -201,7 +83,89 @@
 	bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
 };
 
-typedef CasMutex<RobustPidLocker> Mutex;
+class NullMutex
+{
+public:
+	bool try_lock() { return true; }
+	void lock() {}
+	void unlock() {}
+};
+
+// flock + mutex
+class FMutex
+{
+public:
+	typedef uint64_t id_t;
+	FMutex(id_t id) :
+	    id_(id), fd_(Open(id_))
+	{
+		if (fd_ == -1) { throw "error create mutex!"; }
+	}
+	~FMutex() { Close(fd_); }
+	bool try_lock();
+	void lock();
+	void unlock();
+
+private:
+	static std::string GetPath(id_t id)
+	{
+		const std::string dir("/tmp/.bhome_mtx");
+		mkdir(dir.c_str(), 0777);
+		return dir + "/fm_" + std::to_string(id);
+	}
+	static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDWR, 0666); }
+	static int Close(int fd) { return close(fd); }
+	id_t id_;
+	int fd_;
+	std::mutex mtx_;
+};
+
+union semun {
+	int val;               /* Value for SETVAL */
+	struct semid_ds *buf;  /* Buffer for IPC_STAT, IPC_SET */
+	unsigned short *array; /* Array for GETALL, SETALL */
+	struct seminfo *__buf; /* Buffer for IPC_INFO
+                                           (Linux-specific) */
+};
+
+class SemMutex
+{
+public:
+	SemMutex(key_t key) :
+	    key_(key), sem_id_(semget(key, 1, 0666 | IPC_CREAT))
+	{
+		if (sem_id_ == -1) { throw "error create semaphore."; }
+		union semun init_val;
+		init_val.val = 1;
+		semctl(sem_id_, 0, SETVAL, init_val);
+	}
+	~SemMutex()
+	{
+		// semctl(sem_id_, 0, IPC_RMID, semun{});
+	}
+
+	bool try_lock()
+	{
+		sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT};
+		return semop(sem_id_, &op, 1) == 0;
+	}
+
+	void lock()
+	{
+		sembuf op = {0, -1, SEM_UNDO};
+		semop(sem_id_, &op, 1);
+	}
+
+	void unlock()
+	{
+		sembuf op = {0, 1, SEM_UNDO};
+		semop(sem_id_, &op, 1);
+	}
+
+private:
+	key_t key_;
+	int sem_id_;
+};
 
 template <class Lock>
 class Guard
@@ -245,7 +209,6 @@
 
 	bool push_back(const Data d)
 	{
-		Guard<Mutex> guard(mutex_);
 		auto old = mtail();
 		auto pos = Pos(old);
 		auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0);
@@ -257,7 +220,6 @@
 	}
 	bool pop_front(Data &d)
 	{
-		Guard<Mutex> guard(mutex_);
 		auto old = mhead();
 		auto pos = Pos(old);
 		if (!(pos == tail())) {
@@ -281,7 +243,6 @@
 	meta_type mtail() const { return mtail_.load(); }
 	// data
 	const size_type capacity_;
-	Mutex mutex_;
 	std::atomic<meta_type> mhead_;
 	std::atomic<meta_type> mtail_;
 	Alloc al_;

--
Gitblit v1.8.0