From 28f06bc49a4d8d69f1ea2f767863b7921d12f155 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期六, 08 五月 2021 18:30:48 +0800
Subject: [PATCH] add robust FMutex, works fine; use boost circular.

---
 src/robust.h          |  221 ++++++++++--------------
 src/shm.h             |    7 
 src/shm_msg_queue.h   |   19 +
 src/shm_queue.h       |   26 ++
 utest/robust_test.cpp |  128 +++++++++++---
 src/robust.cpp        |   90 ++-------
 src/shm_msg_queue.cpp |   29 ++
 7 files changed, 273 insertions(+), 247 deletions(-)

diff --git a/src/robust.cpp b/src/robust.cpp
index 006ea5f..08d2073 100644
--- a/src/robust.cpp
+++ b/src/robust.cpp
@@ -25,87 +25,35 @@
 namespace
 {
 static_assert(sizeof(steady_clock::duration) == sizeof(int64_t));
-static_assert(sizeof(RobustReqRep) == 24);
-static_assert(sizeof(Mutex) == 8);
-static_assert(sizeof(CircularBuffer<int>) == 48);
 
 auto Now() { return steady_clock::now().time_since_epoch(); }
-const steady_clock::duration kIoTimeout = 10ms;
-const steady_clock::duration kIoExpire = 100ms;
-
 void Yield() { std::this_thread::sleep_for(10us); }
+
 } // namespace
 
-void QuickSleep()
-{
-	Yield();
-}
-bool RobustReqRep::StateCas(State exp, State val)
-{
-	bool r = state_.compare_exchange_strong(exp, val);
-	return r ? (timestamp_.store(Now()), true) : false;
-}
+void QuickSleep() { Yield(); }
 
-int RobustReqRep::ClientReadReply(Msg &reply)
+bool FMutex::try_lock()
 {
-	auto end_time = Now() + kIoTimeout;
-	int done = false;
-	do {
-		if (StateCas(eServerWriteEnd, eClientReadBegin)) {
-			Read(reply);
-			done = StateCas(eClientReadBegin, eClientReadEnd);
-			if (done) { break; }
-		}
-		Yield();
-	} while (Now() < end_time);
-	return done ? eSuccess : eTimeout;
-}
-
-int RobustReqRep::ClientWriteRequest(const Msg &request)
-{
-	if (request.size() > capacity_) {
-		return eSizeError;
-	}
-	auto end_time = Now() + kIoTimeout;
-	bool done = false;
-	do {
-		if (StateCas(eStateReady, eClientWriteBegin)) {
-			Write(request);
-			done = StateCas(eClientWriteBegin, eClientWriteEnd);
-			if (done) { break; }
-		}
-		Yield();
-	} while (Now() < end_time);
-	return done ? eSuccess : eTimeout;
-}
-
-int RobustReqRep::ServerReadRequest(Msg &request)
-{
-	bool done = false;
-	if (StateCas(eClientWriteEnd, eServerReadBegin)) {
-		Read(request);
-		done = StateCas(eServerReadBegin, eServerReadEnd);
-	} else {
-		auto old = state_.load();
-		if (old != eStateReady && timestamp_.load() + kIoExpire < Now()) {
-			StateCas(old, eStateReady);
+	if (flock(fd_, LOCK_EX | LOCK_NB) == 0) {
+		if (mtx_.try_lock()) {
+			return true;
+		} else {
+			flock(fd_, LOCK_UN);
 		}
 	}
-	return done ? eSuccess : eTimeout;
+	return false;
 }
-
-int RobustReqRep::ServerWriteReply(const Msg &reply)
+void FMutex::lock()
 {
-	if (reply.size() > capacity_) {
-		return eSizeError;
-	}
-	// no need to loop write, either success or timeout.
-	bool done = false;
-	if (StateCas(eServerReadEnd, eServerWriteBegin)) {
-		Write(reply);
-		done = StateCas(eServerWriteBegin, eServerWriteEnd);
-	}
-	return done ? eSuccess : eTimeout;
+	//Note: the lock order affects performance a lot,
+	// locking fd_ first is about 100 times faster than locking mtx_ first.
+	flock(fd_, LOCK_EX);
+	mtx_.lock();
 }
-
+void FMutex::unlock()
+{
+	mtx_.unlock();
+	flock(fd_, LOCK_UN);
+}
 } // namespace robust
\ No newline at end of file
diff --git a/src/robust.h b/src/robust.h
index 3334bc0..d2d94e9 100644
--- a/src/robust.h
+++ b/src/robust.h
@@ -23,8 +23,12 @@
 #include <atomic>
 #include <chrono>
 #include <memory>
-#include <string.h>
+#include <mutex>
 #include <string>
+#include <sys/file.h>
+#include <sys/ipc.h>
+#include <sys/sem.h>
+#include <sys/stat.h>
 #include <sys/types.h>
 #include <unistd.h>
 
@@ -37,143 +41,21 @@
 
 void QuickSleep();
 
-class RobustReqRep
-{
-	typedef uint32_t State;
-	typedef std::string Msg;
-	typedef std::chrono::steady_clock::duration Duration;
-
-public:
-	enum ErrorCode {
-		eSuccess = 0,
-		eTimeout = EAGAIN,
-		eSizeError = EINVAL,
-	};
-
-	explicit RobustReqRep(const uint32_t max_len) :
-	    capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {}
-
-	void PutReady() { state_.store(eStateReady); }
-	bool Ready() const { return state_.load() == eStateReady; }
-	uint32_t capacity() const { return capacity_; }
-
-	int ClientRequest(const Msg &request, Msg &reply)
-	{
-		int r = ClientWriteRequest(request);
-		if (r == eSuccess) {
-			r = ClientReadReply(reply);
-		}
-		return r;
-	}
-	int ClientReadReply(Msg &reply);
-	int ClientWriteRequest(const Msg &request);
-	int ServerReadRequest(Msg &request);
-	int ServerWriteReply(const Msg &reply);
-
-private:
-	RobustReqRep(const RobustReqRep &);
-	RobustReqRep(RobustReqRep &&);
-	RobustReqRep &operator=(const RobustReqRep &) = delete;
-	RobustReqRep &operator=(RobustReqRep &&) = delete;
-
-	enum {
-		eStateInit = 0,
-		eStateReady = 0x19833891,
-		eClientWriteBegin,
-		eClientWriteEnd,
-		eServerReadBegin,
-		eServerReadEnd,
-		eServerWriteBegin,
-		eServerWriteEnd,
-		eClientReadBegin,
-		eClientReadEnd = eStateReady,
-	};
-	bool StateCas(State exp, State val);
-	void Write(const Msg &msg)
-	{
-		size_.store(msg.size());
-		memcpy(buf, msg.data(), msg.size());
-	}
-	void Read(Msg &msg) { msg.assign(buf, size_.load()); }
-
-	const uint32_t capacity_;
-	std::atomic<State> state_;
-	static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data.");
-	std::atomic<Duration> timestamp_;
-	std::atomic<int32_t> size_;
-	char buf[4];
-};
-
-class PidLocker
-{
-public:
-	typedef int locker_t;
-	enum { eLockerBits = sizeof(locker_t) * 8 };
-	static locker_t this_locker()
-	{
-		static locker_t val = getpid();
-		return val;
-	}
-	static bool is_alive(locker_t locker) { return true; }
-};
-
-class RobustPidLocker
-{
-public:
-	typedef int locker_t;
-	enum { eLockerBits = sizeof(locker_t) * 8 };
-	static locker_t this_locker()
-	{
-		static locker_t val = getpid();
-		return val;
-	}
-	static bool is_alive(locker_t locker)
-	{
-		char buf[64] = {0};
-		snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", locker);
-		return access(buf, F_OK) == 0;
-	}
-};
-
-class ExpiredLocker
-{
-public:
-	typedef int64_t locker_t;
-	enum { eLockerBits = 63 };
-	static locker_t this_locker() { return Now(); }
-	static bool is_alive(locker_t locker)
-	{
-		return Now() < locker + steady_clock::duration(10s).count();
-	}
-
-private:
-	static locker_t Now() { return steady_clock::now().time_since_epoch().count(); }
-};
-
-template <class LockerT>
 class CasMutex
 {
-	typedef typename LockerT::locker_t locker_t;
-	static inline locker_t this_locker() { return LockerT::this_locker(); }
-	static inline bool is_alive(locker_t locker) { return LockerT::is_alive(locker); }
-	static const uint64_t kLockerMask = MaskBits(LockerT::eLockerBits);
-	static_assert(LockerT::eLockerBits < 64, "locker size must be smaller than 64 bit!");
+	typedef uint64_t locker_t;
+	static inline locker_t this_locker() { return pthread_self(); }
+	static const uint64_t kLockerMask = MaskBits(63);
 
 public:
 	CasMutex() :
 	    meta_(0) {}
 	int try_lock()
 	{
-		const auto t = steady_clock::now().time_since_epoch().count();
 		auto old = meta_.load();
 		int r = 0;
 		if (!Locked(old)) {
 			r = MetaCas(old, Meta(1, this_locker()));
-		} else if (!is_alive(Locker(old))) {
-			r = static_cast<int>(MetaCas(old, Meta(1, this_locker()))) << 1;
-			if (r) {
-				LOG_DEBUG() << "captured locker " << int64_t(Locker(old)) << " -> " << int64_t(this_locker()) << ", locker = " << r;
-			}
 		}
 		return r;
 	}
@@ -201,7 +83,89 @@
 	bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
 };
 
-typedef CasMutex<RobustPidLocker> Mutex;
+class NullMutex
+{
+public:
+	bool try_lock() { return true; }
+	void lock() {}
+	void unlock() {}
+};
+
+// flock + mutex
+class FMutex
+{
+public:
+	typedef uint64_t id_t;
+	FMutex(id_t id) :
+	    id_(id), fd_(Open(id_))
+	{
+		if (fd_ == -1) { throw "error create mutex!"; }
+	}
+	~FMutex() { Close(fd_); }
+	bool try_lock();
+	void lock();
+	void unlock();
+
+private:
+	static std::string GetPath(id_t id)
+	{
+		const std::string dir("/tmp/.bhome_mtx");
+		mkdir(dir.c_str(), 0777);
+		return dir + "/fm_" + std::to_string(id);
+	}
+	static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDWR, 0666); }
+	static int Close(int fd) { return close(fd); }
+	id_t id_;
+	int fd_;
+	std::mutex mtx_;
+};
+
+union semun {
+	int val;               /* Value for SETVAL */
+	struct semid_ds *buf;  /* Buffer for IPC_STAT, IPC_SET */
+	unsigned short *array; /* Array for GETALL, SETALL */
+	struct seminfo *__buf; /* Buffer for IPC_INFO
+                                           (Linux-specific) */
+};
+
+class SemMutex
+{
+public:
+	SemMutex(key_t key) :
+	    key_(key), sem_id_(semget(key, 1, 0666 | IPC_CREAT))
+	{
+		if (sem_id_ == -1) { throw "error create semaphore."; }
+		union semun init_val;
+		init_val.val = 1;
+		semctl(sem_id_, 0, SETVAL, init_val);
+	}
+	~SemMutex()
+	{
+		// semctl(sem_id_, 0, IPC_RMID, semun{});
+	}
+
+	bool try_lock()
+	{
+		sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT};
+		return semop(sem_id_, &op, 1) == 0;
+	}
+
+	void lock()
+	{
+		sembuf op = {0, -1, SEM_UNDO};
+		semop(sem_id_, &op, 1);
+	}
+
+	void unlock()
+	{
+		sembuf op = {0, 1, SEM_UNDO};
+		semop(sem_id_, &op, 1);
+	}
+
+private:
+	key_t key_;
+	int sem_id_;
+};
 
 template <class Lock>
 class Guard
@@ -245,7 +209,6 @@
 
 	bool push_back(const Data d)
 	{
-		Guard<Mutex> guard(mutex_);
 		auto old = mtail();
 		auto pos = Pos(old);
 		auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0);
@@ -257,7 +220,6 @@
 	}
 	bool pop_front(Data &d)
 	{
-		Guard<Mutex> guard(mutex_);
 		auto old = mhead();
 		auto pos = Pos(old);
 		if (!(pos == tail())) {
@@ -281,7 +243,6 @@
 	meta_type mtail() const { return mtail_.load(); }
 	// data
 	const size_type capacity_;
-	Mutex mutex_;
 	std::atomic<meta_type> mhead_;
 	std::atomic<meta_type> mtail_;
 	Alloc al_;
diff --git a/src/shm.h b/src/shm.h
index b168413..269df44 100644
--- a/src/shm.h
+++ b/src/shm.h
@@ -19,7 +19,7 @@
 #ifndef SHM_6CHO6D6C
 #define SHM_6CHO6D6C
 
-#include "robust.h"
+#include "log.h"
 #include <atomic>
 #include <boost/interprocess/managed_shared_memory.hpp>
 #include <boost/interprocess/sync/interprocess_mutex.hpp>
@@ -90,8 +90,9 @@
 	}
 };
 
-typedef robust::Mutex Mutex;
-typedef robust::Guard<Mutex> Guard;
+typedef interprocess_mutex Mutex;
+typedef scoped_lock<Mutex> Guard;
+// typedef robust::Guard<Mutex> Guard;
 
 class SharedMemory : public mshm_t
 {
diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp
index 17558de..bc5075f 100644
--- a/src/shm_msg_queue.cpp
+++ b/src/shm_msg_queue.cpp
@@ -39,13 +39,13 @@
 
 ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
     id_(id),
-    queue_(segment, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager())
+    queue_(segment, MsgQIdToName(id_), len, segment.get_segment_manager())
 {
 }
 
 ShmMsgQueue::ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len) :
     id_(id),
-    queue_(segment, create_or_else_find, MsgQIdToName(id_))
+    queue_(segment, create_or_else_find, MsgQIdToName(id_), len, segment.get_segment_manager())
 {
 	if (!queue_.IsOk()) {
 		throw("error create/find msgq " + std::to_string(id_));
@@ -56,6 +56,18 @@
 
 ShmMsgQueue::~ShmMsgQueue() {}
 
+ShmMsgQueue::Mutex &ShmMsgQueue::GetMutex(const MQId id)
+{
+	static std::unordered_map<MQId, std::shared_ptr<Mutex>> imm;
+
+	static std::mutex mtx;
+	std::lock_guard<std::mutex> lock(mtx);
+	auto pos = imm.find(id);
+	if (pos == imm.end()) {
+		pos = imm.emplace(id, new Mutex(id)).first;
+	}
+	return *pos->second;
+}
 bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id)
 {
 	Queue *q = Find(shm, id);
@@ -75,14 +87,15 @@
 
 bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg)
 {
-	Queue *remote = Find(shm, remote_id);
 	bool r = false;
-	if (remote) {
+	try {
+		ShmMsgQueue dest(remote_id, false, shm, 1);
 		msg.AddRef();
-		r = remote->TryWrite(msg.Offset());
-		if (!r) {
-			msg.Release();
-		}
+		DEFER1(if (!r) { msg.Release(); });
+
+		Guard lock(GetMutex(remote_id));
+		r = dest.queue().TryWrite(msg.Offset());
+	} catch (...) {
 	}
 	return r;
 }
diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h
index f496f0f..f8888f3 100644
--- a/src/shm_msg_queue.h
+++ b/src/shm_msg_queue.h
@@ -26,11 +26,13 @@
 
 class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
 {
-	typedef ShmObject<SharedQ63<4>> Shmq;
-	// typedef ShmObject<SharedQueue<int64_t>> Shmq;
+	// typedef ShmObject<SharedQ63<4>> Shmq;
+	typedef ShmObject<SharedQueue<int64_t>> Shmq;
 	typedef Shmq::ShmType ShmType;
 	typedef Shmq::Data Queue;
 	typedef std::function<void()> OnSend;
+	typedef robust::FMutex Mutex;
+	typedef robust::Guard<Mutex> Guard;
 
 public:
 	typedef uint64_t MQId;
@@ -45,13 +47,22 @@
 	MQId Id() const { return id_; }
 	ShmType &shm() const { return queue_.shm(); }
 
-	bool Recv(MsgI &msg, const int timeout_ms) { return queue().Read(msg.OffsetRef(), timeout_ms); }
-	bool TryRecv(MsgI &msg) { return queue().TryRead(msg.OffsetRef()); }
+	bool Recv(MsgI &msg, const int timeout_ms)
+	{
+		Guard lock(GetMutex(Id()));
+		return queue().Read(msg.OffsetRef(), timeout_ms);
+	}
+	bool TryRecv(MsgI &msg)
+	{
+		Guard lock(GetMutex(Id()));
+		return queue().TryRead(msg.OffsetRef());
+	}
 	static Queue *Find(SharedMemory &shm, const MQId remote_id);
 	static bool TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg);
 	bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); }
 
 private:
+	static Mutex &GetMutex(const MQId id);
 	MQId id_;
 	Queue &queue() { return *queue_.data(); }
 	Shmq queue_;
diff --git a/src/shm_queue.h b/src/shm_queue.h
index c7d3a23..0041f16 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -19,15 +19,18 @@
 #ifndef SHM_QUEUE_JE0OEUP3
 #define SHM_QUEUE_JE0OEUP3
 
+#include "robust.h"
 #include "shm.h"
 #include <atomic>
+#include <boost/circular_buffer.hpp>
 #include <chrono>
 
 namespace bhome_shm
 {
 
 template <class D>
-using Circular = robust::CircularBuffer<D, Allocator<D>>;
+using Circular = boost::circular_buffer<D, Allocator<D>>;
+// using Circular = robust::CircularBuffer<D, Allocator<D>>;
 
 template <class D>
 class SharedQueue
@@ -49,8 +52,25 @@
 		} while (steady_clock::now() < end_time);
 		return false;
 	}
-	bool TryRead(D &d) { return queue_.pop_front(d); }
-	bool TryWrite(const D &d) { return queue_.push_back(d); }
+	bool TryRead(D &d)
+	{
+		if (!queue_.empty()) {
+			d = queue_.front();
+			queue_.pop_front();
+			return true;
+		} else {
+			return false;
+		}
+	}
+	bool TryWrite(const D &d)
+	{
+		if (!queue_.full()) {
+			queue_.push_back(d);
+			return true;
+		} else {
+			return false;
+		}
+	}
 
 private:
 	Circular<D> queue_;
diff --git a/utest/robust_test.cpp b/utest/robust_test.cpp
index 2b4ba96..68c0e72 100644
--- a/utest/robust_test.cpp
+++ b/utest/robust_test.cpp
@@ -1,5 +1,6 @@
 #include "robust.h"
 #include "util.h"
+#include <boost/circular_buffer.hpp>
 
 using namespace robust;
 
@@ -18,7 +19,7 @@
 BOOST_AUTO_TEST_CASE(QueueTest)
 {
 	const int nthread = 100;
-	const uint64_t nmsg = 1000 * 1000 * 100;
+	const uint64_t nmsg = 1000 * 1000 * 10;
 
 	SharedMemory &shm = TestShm();
 	shm.Remove();
@@ -33,7 +34,12 @@
 		BOOST_CHECK_EQUAL((u64 & 255), i);
 	}
 
-#if 1
+	uint64_t correct_total = nmsg * (nmsg - 1) / 2;
+	std::atomic<uint64_t> total(0);
+	std::atomic<uint64_t> nwrite(0);
+	std::atomic<uint64_t> writedone(0);
+
+#if 0
 	typedef AtomicQueue<4> Rcb;
 
 	Rcb tmp;
@@ -48,18 +54,8 @@
 	BOOST_CHECK(tmp.tail() == 1);
 
 	ShmObject<Rcb> rcb(shm, "test_rcb");
-#else
-	typedef Circular<int64_t> Rcb;
-	ShmObject<Rcb> rcb(shm, "test_rcb", 64, shm.get_segment_manager());
-#endif
+	bool try_more = true;
 
-	const int nsize = sizeof(Rcb);
-
-	bool try_more = false;
-	uint64_t correct_total = nmsg * (nmsg - 1) / 2;
-	std::atomic<uint64_t> total(0);
-	std::atomic<uint64_t> nwrite(0);
-	std::atomic<uint64_t> writedone(0);
 	auto Writer = [&]() {
 		uint64_t n = 0;
 		while ((n = nwrite++) < nmsg) {
@@ -82,6 +78,58 @@
 		}
 	};
 
+#else
+	typedef Circular<int64_t> Rcb;
+	ShmObject<Rcb> rcb(shm, "test_rcb", 16, shm.get_segment_manager());
+
+	typedef FMutex Mutex;
+	// typedef SemMutex Mutex;
+	Mutex mtx(123);
+	auto Writer = [&]() {
+		uint64_t n = 0;
+		while ((n = nwrite++) < nmsg) {
+			auto Write = [&]() {
+				robust::Guard<Mutex> lk(mtx);
+				if (rcb->full()) {
+					return false;
+				} else {
+					rcb->push_back(n);
+					return true;
+				}
+				// return rcb->push_back(n);
+			};
+			while (!Write()) {
+				// MySleep();
+			}
+			++writedone;
+		}
+	};
+	std::atomic<uint64_t> nread(0);
+	auto Reader = [&]() {
+		while (nread.load() < nmsg) {
+			int64_t d;
+			auto Read = [&]() {
+				robust::Guard<Mutex> lk(mtx);
+				if (rcb->empty()) {
+					return false;
+				} else {
+					d = rcb->front();
+					rcb->pop_front();
+					return true;
+				}
+				// return rcb->pop_front(d);
+			};
+			if (Read()) {
+				++nread;
+				total += d;
+			} else {
+				// MySleep();
+			}
+		}
+	};
+
+#endif
+
 	auto status = [&]() {
 		auto next = steady_clock::now();
 		uint32_t lw = 0;
@@ -102,7 +150,8 @@
 	{
 		ThreadManager threads;
 		boost::timer::auto_cpu_timer timer;
-		printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, Rcb::capacity, nthread);
+		// printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, Rcb::capacity, nthread);
+		printf("Testing Robust Buffer, msgs %ld, queue size: %d, threads: %d \n", nmsg, 16, nthread);
 		for (int i = 0; i < nthread; ++i) {
 			threads.Launch(Reader);
 			threads.Launch(Writer);
@@ -116,7 +165,8 @@
 
 BOOST_AUTO_TEST_CASE(MutexTest)
 {
-	typedef robust::Mutex RobustMutex;
+	// typedef robust::MFMutex RobustMutex;
+	typedef robust::SemMutex RobustMutex;
 
 	for (int i = 0; i < 20; ++i) {
 		int size = i;
@@ -131,7 +181,9 @@
 
 	const std::string mtx_name("test_mutex");
 	const std::string int_name("test_int");
-	auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name);
+	// auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name, 12345);
+	RobustMutex rmtx(12345);
+	auto mtx = &rmtx;
 	auto pi = shm.FindOrCreate<int>(int_name, 100);
 
 	std::mutex m;
@@ -142,29 +194,48 @@
 		printf("int : %d, add1: %d\n", old, ++*pi);
 	}
 
-	{
-		const int ntimes = 1000 * 1000;
-		RobustMutex mutex;
+	auto LockSpeed = [](auto &mutex, const std::string &name) {
+		const int ntimes = 1000 * 1;
 		auto Lock = [&]() {
 			for (int i = 0; i < ntimes; ++i) {
 				mutex.lock();
 				mutex.unlock();
 			}
 		};
-
+		printf("\nTesting %s lock/unlock %d times\n", name.c_str(), ntimes);
 		{
 			boost::timer::auto_cpu_timer timer;
-			printf("test lock/unlock %d times: ", ntimes);
+			printf("1 thread: ");
 			Lock();
 		}
-		{
+		auto InThread = [&](int nthread) {
 			boost::timer::auto_cpu_timer timer;
-			printf("test lock/unlock %d times, 2 thread: ", ntimes);
-			std::thread t1(Lock), t2(Lock);
-			t1.join();
-			t2.join();
-		}
-	}
+			printf("%d threads: ", nthread);
+			std::vector<std::thread> vt;
+			for (int i = 0; i < nthread; ++i) {
+				vt.emplace_back(Lock);
+			}
+			for (auto &t : vt) {
+				t.join();
+			}
+		};
+		InThread(4);
+		InThread(16);
+		InThread(100);
+		InThread(1000);
+	};
+	NullMutex null_mtx;
+	std::mutex std_mtx;
+	CasMutex cas_mtx;
+	FMutex mfmtx(3);
+	boost::interprocess::interprocess_mutex ipc_mutex;
+	SemMutex sem_mtx(3);
+	LockSpeed(null_mtx, "null mutex");
+	LockSpeed(std_mtx, "std::mutex");
+	// LockSpeed(cas_mtx, "CAS mutex");
+	LockSpeed(ipc_mutex, "boost ipc mutex");
+	LockSpeed(mfmtx, "mutex+flock");
+	LockSpeed(sem_mtx, "sem mutex");
 
 	auto TryLock = [&]() {
 		if (mtx->try_lock()) {
@@ -183,6 +254,7 @@
 	if (mtx) {
 		printf("mtx exists\n");
 		if (TryLock()) {
+			// Sleep(10s);
 			auto op = [&]() {
 				if (TryLock()) {
 					Unlock();

--
Gitblit v1.8.0