From 72bffb0807925a156b076b71f78c848a08d27b87 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 29 四月 2021 10:55:35 +0800
Subject: [PATCH] refactor mutex.

---
 src/robust.h          |   73 +++++++++-----
 src/shm.h             |    2 
 utest/api_test.cpp    |   85 -----------------
 utest/robust_test.cpp |  104 ++++++++++++++++++--
 src/robust.cpp        |    2 
 5 files changed, 140 insertions(+), 126 deletions(-)

diff --git a/src/robust.cpp b/src/robust.cpp
index 38d5d28..006ea5f 100644
--- a/src/robust.cpp
+++ b/src/robust.cpp
@@ -26,7 +26,7 @@
 {
 static_assert(sizeof(steady_clock::duration) == sizeof(int64_t));
 static_assert(sizeof(RobustReqRep) == 24);
-static_assert(sizeof(CasMutex<false>) == 8);
+static_assert(sizeof(Mutex) == 8);
 static_assert(sizeof(CircularBuffer<int>) == 48);
 
 auto Now() { return steady_clock::now().time_since_epoch(); }
diff --git a/src/robust.h b/src/robust.h
index 19e9bda..983567f 100644
--- a/src/robust.h
+++ b/src/robust.h
@@ -32,6 +32,7 @@
 
 using namespace std::chrono;
 using namespace std::chrono_literals;
+constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; }
 
 void QuickSleep();
 
@@ -102,20 +103,41 @@
 	char buf[4];
 };
 
-template <bool isRobust = false>
-class CasMutex
+class PidLocker
 {
-	static pid_t pid()
+public:
+	typedef int locker_t;
+	static locker_t this_locker()
 	{
-		static pid_t val = getpid();
+		static locker_t val = getpid();
 		return val;
 	}
-	static bool Killed(pid_t pid)
+	static bool is_alive(locker_t locker) { return true; }
+};
+
+class RobustPidLocker
+{
+public:
+	typedef int locker_t;
+	static locker_t this_locker()
+	{
+		static locker_t val = getpid();
+		return val;
+	}
+	static bool is_alive(locker_t locker)
 	{
 		char buf[64] = {0};
-		snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid);
-		return access(buf, F_OK) != 0;
+		snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker);
+		return access(buf, F_OK) == 0;
 	}
+};
+
+template <class LockerT>
+class CasMutex
+{
+	typedef typename LockerT::locker_t locker_t;
+	static inline locker_t this_locker() { return LockerT::this_locker(); }
+	static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); }
 
 public:
 	CasMutex() :
@@ -126,11 +148,11 @@
 		auto old = meta_.load();
 		int r = 0;
 		if (!Locked(old)) {
-			r = MetaCas(old, Meta(1, pid()));
-		} else if (isRobust && Killed(Pid(old))) {
-			r = static_cast<int>(MetaCas(old, Meta(1, pid()))) << 1;
+			r = MetaCas(old, Meta(1, this_locker()));
+		} else if (!is_alive(Locker(old))) {
+			r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1;
 			if (r) {
-				printf("captured pid %d -> %d, r = %d\n", Pid(old), pid(), r);
+				printf("captured pid %d -> %d, r = %d\n", Locker(old), this_locker(), r);
 			}
 		}
 		return r;
@@ -146,19 +168,21 @@
 	void unlock()
 	{
 		auto old = meta_.load();
-		if (Locked(old) && Pid(old) == pid()) {
-			MetaCas(old, Meta(0, pid()));
+		if (Locked(old) && Locker(old) == this_locker()) {
+			MetaCas(old, Meta(0, this_locker()));
 		}
 	}
 
 private:
+	static_assert(sizeof(locker_t) < sizeof(uint64_t), "locker size must be smaller than 64 bit!");
 	std::atomic<uint64_t> meta_;
 	bool Locked(uint64_t meta) { return (meta >> 63) != 0; }
-	pid_t Pid(uint64_t meta) { return meta & ~(uint64_t(1) << 63); }
-	uint64_t Meta(uint64_t lk, pid_t pid) { return (lk << 63) | pid; }
+	locker_t Locker(uint64_t meta) { return meta & MaskBits(sizeof(locker_t) * 8); }
+	uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; }
 	bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
-	static_assert(sizeof(pid_t) < sizeof(uint64_t));
 };
+
+typedef CasMutex<RobustPidLocker> Mutex;
 
 template <class Lock>
 class Guard
@@ -193,19 +217,17 @@
 	    state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap))
 	{
 		if (!buf) {
-			throw("error allocate buffer: out of mem!");
+			throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!");
 		}
 	}
-	~CircularBuffer()
-	{
-		al_.deallocate(buf, capacity_);
-	}
+	~CircularBuffer() { al_.deallocate(buf, capacity_); }
+
 	size_type size() const { return (capacity_ + tail() - head()) % capacity_; }
 	bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; }
 	bool empty() const { return head() == tail(); }
 	bool push_back(Data d)
 	{
-		Guard<MutexT> guard(mutex_);
+		Guard<Mutex> guard(mutex_);
 		if (!full()) {
 			auto old = mtail();
 			buf[Pos(old)] = d;
@@ -216,7 +238,7 @@
 	}
 	bool pop_front(Data &d)
 	{
-		Guard<MutexT> guard(mutex_);
+		Guard<Mutex> guard(mutex_);
 		if (!empty()) {
 			auto old = mhead();
 			d = buf[Pos(old)];
@@ -233,8 +255,7 @@
 	CircularBuffer(CircularBuffer &&);
 	CircularBuffer &operator=(const CircularBuffer &) = delete;
 	CircularBuffer &operator=(CircularBuffer &&) = delete;
-	typedef CasMutex<true> MutexT;
-	// static_assert(sizeof(MutexT) == 16);
+
 	meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); }
 	size_type head() const { return Pos(mhead()); }
 	size_type tail() const { return Pos(mtail()); }
@@ -244,7 +265,7 @@
 	enum { eStateReady = 0x19833891 };
 	std::atomic<uint32_t> state_;
 	const size_type capacity_;
-	MutexT mutex_;
+	Mutex mutex_;
 	std::atomic<meta_type> mhead_;
 	std::atomic<meta_type> mtail_;
 	Alloc al_;
diff --git a/src/shm.h b/src/shm.h
index 17352fe..515d856 100644
--- a/src/shm.h
+++ b/src/shm.h
@@ -90,7 +90,7 @@
 	}
 };
 
-typedef robust::CasMutex<true> Mutex;
+typedef robust::Mutex Mutex;
 typedef robust::Guard<Mutex> Guard;
 
 class SharedMemory : public mshm_t
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 6577b51..c6165e8 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -102,91 +102,6 @@
 using namespace std::chrono;
 // using namespace std::chrono_literals;
 
-BOOST_AUTO_TEST_CASE(MutexTest)
-{
-	// typedef robust::CasMutex<true> RobustMutex;
-	typedef MutexWithPidCheck RobustMutex;
-
-	for (int i = 0; i < 20; ++i) {
-		int size = i;
-		int left = size & 7;
-		int rsize = size + ((8 - left) & 7);
-		printf("size: %3d, rsize: %3d\n", size, rsize);
-	}
-	SharedMemory &shm = TestShm();
-	// shm.Remove();
-	// return;
-	GlobalInit(shm);
-
-	const std::string mtx_name("test_mutex");
-	const std::string int_name("test_int");
-	auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name);
-	auto pi = shm.FindOrCreate<int>(int_name, 100);
-
-	std::mutex m;
-	typedef std::chrono::steady_clock Clock;
-	auto Now = []() { return Clock::now().time_since_epoch(); };
-	if (pi) {
-		auto old = *pi;
-		printf("int : %d, add1: %d\n", old, ++*pi);
-	}
-
-	{
-		boost::timer::auto_cpu_timer timer;
-		const int ntimes = 1000 * 1000;
-		printf("test lock/unlock %d times: ", ntimes);
-		RobustMutex mutex;
-		auto Lock = [&]() {
-			for (int i = 0; i < ntimes; ++i) {
-				mutex.lock();
-				mutex.unlock();
-			}
-		};
-		std::thread t1(Lock), t2(Lock);
-		t1.join();
-		t2.join();
-	}
-
-	auto MSFromNow = [](const int ms) {
-		using namespace boost::posix_time;
-		ptime cur = boost::posix_time::microsec_clock::universal_time();
-		return cur + millisec(ms);
-	};
-
-	auto TryLock = [&]() {
-		if (mtx->try_lock()) {
-			printf("try_lock ok\n");
-			return true;
-		} else {
-			printf("try_lock failed\n");
-			return false;
-		}
-	};
-	auto Unlock = [&]() {
-		mtx->unlock();
-		printf("unlocked\n");
-	};
-
-	if (mtx) {
-		printf("mtx exists\n");
-		if (TryLock()) {
-			auto op = [&]() {
-				if (TryLock()) {
-					Unlock();
-				}
-			};
-			op();
-			std::thread t(op);
-			t.join();
-			// Unlock();
-		} else {
-			// mtx->unlock();
-		}
-	} else {
-		printf("mtx not exists\n");
-	}
-}
-
 BOOST_AUTO_TEST_CASE(ApiTest)
 {
 	auto max_time = std::chrono::steady_clock::time_point::max();
diff --git a/utest/robust_test.cpp b/utest/robust_test.cpp
index 0d54b46..9384c10 100644
--- a/utest/robust_test.cpp
+++ b/utest/robust_test.cpp
@@ -3,8 +3,13 @@
 
 using namespace robust;
 
+enum {
+	eLockerBits = 32,
+	eLockerMask = MaskBits(sizeof(int) * 8),
+};
+
 typedef CircularBuffer<int64_t, Allocator<int64_t>> Rcb;
-Rcb *GetRCBImpl(SharedMemory &shm, const int nelem)
+Rcb *GetRCB(SharedMemory &shm, const int nelem)
 {
 	int cap = nelem + 1;
 	typedef uint64_t Data;
@@ -15,24 +20,13 @@
 	}
 	return nullptr;
 }
-Rcb *GetRCB(SharedMemory &shm, const int nelem)
-{
-	void **pStore = shm.FindOrCreate<void *>("test_rcb_pointer", nullptr);
-	if (pStore) {
-		if (!*pStore) {
-			*pStore = GetRCBImpl(shm, nelem);
-		}
-		return (Rcb *) *pStore;
-	}
-	return nullptr;
-}
 
 void MySleep()
 {
 	std::this_thread::sleep_for(2us);
 }
 
-BOOST_AUTO_TEST_CASE(RobustTest)
+BOOST_AUTO_TEST_CASE(QueueTest)
 {
 	SharedMemory &shm = TestShm();
 	shm.Remove();
@@ -112,4 +106,88 @@
 	threads.WaitAll();
 	printf("total: %ld, expected: %ld\n", total.load(), correct_total);
 	BOOST_CHECK_EQUAL(total.load(), correct_total);
+}
+
+BOOST_AUTO_TEST_CASE(MutexTest)
+{
+	typedef robust::Mutex RobustMutex;
+
+	for (int i = 0; i < 20; ++i) {
+		int size = i;
+		int left = size & 7;
+		int rsize = size + ((8 - left) & 7);
+		printf("size: %3d, rsize: %3d\n", size, rsize);
+	}
+	SharedMemory &shm = TestShm();
+	// shm.Remove();
+	// return;
+	GlobalInit(shm);
+
+	const std::string mtx_name("test_mutex");
+	const std::string int_name("test_int");
+	auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name);
+	auto pi = shm.FindOrCreate<int>(int_name, 100);
+
+	std::mutex m;
+	typedef std::chrono::steady_clock Clock;
+	auto Now = []() { return Clock::now().time_since_epoch(); };
+	if (pi) {
+		auto old = *pi;
+		printf("int : %d, add1: %d\n", old, ++*pi);
+	}
+
+	{
+		boost::timer::auto_cpu_timer timer;
+		const int ntimes = 1000 * 1000;
+		printf("test lock/unlock %d times: ", ntimes);
+		RobustMutex mutex;
+		auto Lock = [&]() {
+			for (int i = 0; i < ntimes; ++i) {
+				mutex.lock();
+				mutex.unlock();
+			}
+		};
+		std::thread t1(Lock), t2(Lock);
+		t1.join();
+		t2.join();
+	}
+
+	auto MSFromNow = [](const int ms) {
+		using namespace boost::posix_time;
+		ptime cur = boost::posix_time::microsec_clock::universal_time();
+		return cur + millisec(ms);
+	};
+
+	auto TryLock = [&]() {
+		if (mtx->try_lock()) {
+			printf("try_lock ok\n");
+			return true;
+		} else {
+			printf("try_lock failed\n");
+			return false;
+		}
+	};
+	auto Unlock = [&]() {
+		mtx->unlock();
+		printf("unlocked\n");
+	};
+
+	if (mtx) {
+		printf("mtx exists\n");
+		if (TryLock()) {
+			auto op = [&]() {
+				if (TryLock()) {
+					Unlock();
+				}
+			};
+			op();
+			std::thread t(op);
+			t.join();
+			// Unlock();
+		} else {
+			// mtx->unlock();
+		}
+	} else {
+		printf("mtx not exists\n");
+	}
 }
\ No newline at end of file

--
Gitblit v1.8.0