From a6f67b4249525089fb97eb9418c7014f66c2a000 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 28 四月 2021 19:29:17 +0800
Subject: [PATCH] use new robust mutex, circurar; rm timeout mutex.

---
 utest/speed_test.cpp   |    4 
 src/robust.h           |  255 +++++++++++++++
 src/shm.h              |   88 +++--
 utest/api_test.cpp     |  163 +--------
 utest/simple_tests.cpp |    4 
 utest/util.h           |    1 
 src/shm_queue.h        |  131 +++-----
 utest/robust_test.cpp  |  115 +++++++
 utest/utest.cpp        |    4 
 src/shm.cpp            |   36 --
 src/robust.cpp         |  111 ++++++
 11 files changed, 609 insertions(+), 303 deletions(-)

diff --git a/src/robust.cpp b/src/robust.cpp
new file mode 100644
index 0000000..38d5d28
--- /dev/null
+++ b/src/robust.cpp
@@ -0,0 +1,111 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  robust.cpp
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�27鏃� 10鏃�04鍒�19绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#include "robust.h"
+#include <chrono>
+#include <thread>
+
+namespace robust
+{
+
+namespace
+{
+static_assert(sizeof(steady_clock::duration) == sizeof(int64_t));
+static_assert(sizeof(RobustReqRep) == 24);
+static_assert(sizeof(CasMutex<false>) == 8);
+static_assert(sizeof(CircularBuffer<int>) == 48);
+
+auto Now() { return steady_clock::now().time_since_epoch(); }
+const steady_clock::duration kIoTimeout = 10ms;
+const steady_clock::duration kIoExpire = 100ms;
+
+void Yield() { std::this_thread::sleep_for(10us); }
+} // namespace
+
+void QuickSleep()
+{
+	Yield();
+}
+bool RobustReqRep::StateCas(State exp, State val)
+{
+	bool r = state_.compare_exchange_strong(exp, val);
+	return r ? (timestamp_.store(Now()), true) : false;
+}
+
+int RobustReqRep::ClientReadReply(Msg &reply)
+{
+	auto end_time = Now() + kIoTimeout;
+	int done = false;
+	do {
+		if (StateCas(eServerWriteEnd, eClientReadBegin)) {
+			Read(reply);
+			done = StateCas(eClientReadBegin, eClientReadEnd);
+			if (done) { break; }
+		}
+		Yield();
+	} while (Now() < end_time);
+	return done ? eSuccess : eTimeout;
+}
+
+int RobustReqRep::ClientWriteRequest(const Msg &request)
+{
+	if (request.size() > capacity_) {
+		return eSizeError;
+	}
+	auto end_time = Now() + kIoTimeout;
+	bool done = false;
+	do {
+		if (StateCas(eStateReady, eClientWriteBegin)) {
+			Write(request);
+			done = StateCas(eClientWriteBegin, eClientWriteEnd);
+			if (done) { break; }
+		}
+		Yield();
+	} while (Now() < end_time);
+	return done ? eSuccess : eTimeout;
+}
+
+int RobustReqRep::ServerReadRequest(Msg &request)
+{
+	bool done = false;
+	if (StateCas(eClientWriteEnd, eServerReadBegin)) {
+		Read(request);
+		done = StateCas(eServerReadBegin, eServerReadEnd);
+	} else {
+		auto old = state_.load();
+		if (old != eStateReady && timestamp_.load() + kIoExpire < Now()) {
+			StateCas(old, eStateReady);
+		}
+	}
+	return done ? eSuccess : eTimeout;
+}
+
+int RobustReqRep::ServerWriteReply(const Msg &reply)
+{
+	if (reply.size() > capacity_) {
+		return eSizeError;
+	}
+	// no need to loop write, either success or timeout.
+	bool done = false;
+	if (StateCas(eServerReadEnd, eServerWriteBegin)) {
+		Write(reply);
+		done = StateCas(eServerWriteBegin, eServerWriteEnd);
+	}
+	return done ? eSuccess : eTimeout;
+}
+
+} // namespace robust
\ No newline at end of file
diff --git a/src/robust.h b/src/robust.h
new file mode 100644
index 0000000..19e9bda
--- /dev/null
+++ b/src/robust.h
@@ -0,0 +1,255 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  robust.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�27鏃� 10鏃�04鍒�29绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+
+#ifndef ROBUST_Q31RCWYU
+#define ROBUST_Q31RCWYU
+
+#include <atomic>
+#include <chrono>
+#include <memory>
+#include <string.h>
+#include <string>
+#include <sys/types.h>
+#include <unistd.h>
+
+namespace robust
+{
+
+using namespace std::chrono;
+using namespace std::chrono_literals;
+
+void QuickSleep();
+
+class RobustReqRep
+{
+	typedef uint32_t State;
+	typedef std::string Msg;
+	typedef std::chrono::steady_clock::duration Duration;
+
+public:
+	enum ErrorCode {
+		eSuccess = 0,
+		eTimeout = EAGAIN,
+		eSizeError = EINVAL,
+	};
+
+	explicit RobustReqRep(const uint32_t max_len) :
+	    capacity_(max_len), state_(eStateInit), timestamp_(Duration(0)), size_(0) {}
+
+	void PutReady() { state_.store(eStateReady); }
+	bool Ready() const { return state_.load() == eStateReady; }
+	uint32_t capacity() const { return capacity_; }
+
+	int ClientRequest(const Msg &request, Msg &reply)
+	{
+		int r = ClientWriteRequest(request);
+		if (r == eSuccess) {
+			r = ClientReadReply(reply);
+		}
+		return r;
+	}
+	int ClientReadReply(Msg &reply);
+	int ClientWriteRequest(const Msg &request);
+	int ServerReadRequest(Msg &request);
+	int ServerWriteReply(const Msg &reply);
+
+private:
+	RobustReqRep(const RobustReqRep &);
+	RobustReqRep(RobustReqRep &&);
+	RobustReqRep &operator=(const RobustReqRep &) = delete;
+	RobustReqRep &operator=(RobustReqRep &&) = delete;
+
+	enum {
+		eStateInit = 0,
+		eStateReady = 0x19833891,
+		eClientWriteBegin,
+		eClientWriteEnd,
+		eServerReadBegin,
+		eServerReadEnd,
+		eServerWriteBegin,
+		eServerWriteEnd,
+		eClientReadBegin,
+		eClientReadEnd = eStateReady,
+	};
+	bool StateCas(State exp, State val);
+	void Write(const Msg &msg)
+	{
+		size_.store(msg.size());
+		memcpy(buf, msg.data(), msg.size());
+	}
+	void Read(Msg &msg) { msg.assign(buf, size_.load()); }
+
+	const uint32_t capacity_;
+	std::atomic<State> state_;
+	static_assert(sizeof(State) == sizeof(state_), "atomic should has no extra data.");
+	std::atomic<Duration> timestamp_;
+	std::atomic<int32_t> size_;
+	char buf[4];
+};
+
+template <bool isRobust = false>
+class CasMutex
+{
+	static pid_t pid()
+	{
+		static pid_t val = getpid();
+		return val;
+	}
+	static bool Killed(pid_t pid)
+	{
+		char buf[64] = {0};
+		snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid);
+		return access(buf, F_OK) != 0;
+	}
+
+public:
+	CasMutex() :
+	    meta_(0) {}
+	int try_lock()
+	{
+		const auto t = steady_clock::now().time_since_epoch().count();
+		auto old = meta_.load();
+		int r = 0;
+		if (!Locked(old)) {
+			r = MetaCas(old, Meta(1, pid()));
+		} else if (isRobust && Killed(Pid(old))) {
+			r = static_cast<int>(MetaCas(old, Meta(1, pid()))) << 1;
+			if (r) {
+				printf("captured pid %d -> %d, r = %d\n", Pid(old), pid(), r);
+			}
+		}
+		return r;
+	}
+	int lock()
+	{
+		int r = 0;
+		do {
+			r = try_lock();
+		} while (r == 0);
+		return r;
+	}
+	void unlock()
+	{
+		auto old = meta_.load();
+		if (Locked(old) && Pid(old) == pid()) {
+			MetaCas(old, Meta(0, pid()));
+		}
+	}
+
+private:
+	std::atomic<uint64_t> meta_;
+	bool Locked(uint64_t meta) { return (meta >> 63) != 0; }
+	pid_t Pid(uint64_t meta) { return meta & ~(uint64_t(1) << 63); }
+	uint64_t Meta(uint64_t lk, pid_t pid) { return (lk << 63) | pid; }
+	bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
+	static_assert(sizeof(pid_t) < sizeof(uint64_t));
+};
+
+template <class Lock>
+class Guard
+{
+public:
+	Guard(Lock &l) :
+	    l_(l) { l_.lock(); }
+	~Guard() { l_.unlock(); }
+
+private:
+	Guard(const Guard &);
+	Guard(Guard &&);
+	Lock &l_;
+};
+
+template <class D, class Alloc = std::allocator<D>>
+class CircularBuffer
+{
+	typedef uint32_t size_type;
+	typedef uint32_t count_type;
+	typedef uint64_t meta_type;
+	static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; }
+	static count_type Count(meta_type meta) { return meta >> 32; }
+	static size_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; }
+
+public:
+	typedef D Data;
+
+	CircularBuffer(const size_type cap) :
+	    CircularBuffer(cap, Alloc()) {}
+	CircularBuffer(const size_type cap, Alloc const &al) :
+	    state_(0), capacity_(cap), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(cap))
+	{
+		if (!buf) {
+			throw("error allocate buffer: out of mem!");
+		}
+	}
+	~CircularBuffer()
+	{
+		al_.deallocate(buf, capacity_);
+	}
+	size_type size() const { return (capacity_ + tail() - head()) % capacity_; }
+	bool full() const { return (capacity_ + tail() + 1 - head()) % capacity_ == 0; }
+	bool empty() const { return head() == tail(); }
+	bool push_back(Data d)
+	{
+		Guard<MutexT> guard(mutex_);
+		if (!full()) {
+			auto old = mtail();
+			buf[Pos(old)] = d;
+			return mtail_.compare_exchange_strong(old, next(old));
+		} else {
+			return false;
+		}
+	}
+	bool pop_front(Data &d)
+	{
+		Guard<MutexT> guard(mutex_);
+		if (!empty()) {
+			auto old = mhead();
+			d = buf[Pos(old)];
+			return mhead_.compare_exchange_strong(old, next(old));
+		} else {
+			return false;
+		}
+	}
+	bool Ready() const { return state_.load() == eStateReady; }
+	void PutReady() { state_.store(eStateReady); }
+
+private:
+	CircularBuffer(const CircularBuffer &);
+	CircularBuffer(CircularBuffer &&);
+	CircularBuffer &operator=(const CircularBuffer &) = delete;
+	CircularBuffer &operator=(CircularBuffer &&) = delete;
+	typedef CasMutex<true> MutexT;
+	// static_assert(sizeof(MutexT) == 16);
+	meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); }
+	size_type head() const { return Pos(mhead()); }
+	size_type tail() const { return Pos(mtail()); }
+	meta_type mhead() const { return mhead_.load(); }
+	meta_type mtail() const { return mtail_.load(); }
+	// data
+	enum { eStateReady = 0x19833891 };
+	std::atomic<uint32_t> state_;
+	const size_type capacity_;
+	MutexT mutex_;
+	std::atomic<meta_type> mhead_;
+	std::atomic<meta_type> mtail_;
+	Alloc al_;
+	typename Alloc::pointer buf = nullptr;
+};
+
+} // namespace robust
+#endif // end of include guard: ROBUST_Q31RCWYU
diff --git a/src/shm.cpp b/src/shm.cpp
index 6d7dccd..1658900 100644
--- a/src/shm.cpp
+++ b/src/shm.cpp
@@ -21,42 +21,6 @@
 namespace bhome_shm
 {
 
-bool MutexWithTimeLimit::try_lock()
-{
-	if (mutex_.try_lock()) {
-		auto old_time = last_lock_time_.load();
-		if (Now() - old_time > limit_) {
-			return last_lock_time_.compare_exchange_strong(old_time, Now());
-		} else {
-			last_lock_time_.store(Now());
-			return true;
-		}
-	} else {
-		auto old_time = last_lock_time_.load();
-		if (Now() - old_time > limit_) {
-			return last_lock_time_.compare_exchange_strong(old_time, Now());
-		} else {
-			return false;
-		}
-	}
-}
-void MutexWithTimeLimit::lock()
-{
-	while (!try_lock()) {
-		std::this_thread::yield();
-	}
-}
-void MutexWithTimeLimit::unlock()
-{
-	auto old_time = last_lock_time_.load();
-	if (Now() - old_time > limit_) {
-	} else {
-		if (last_lock_time_.compare_exchange_strong(old_time, Now())) {
-			mutex_.unlock();
-		}
-	}
-}
-
 SharedMemory::SharedMemory(const std::string &name, const uint64_t size) :
     mshm_t(open_or_create, name.c_str(), size, 0, AllowAll()),
     name_(name)
diff --git a/src/shm.h b/src/shm.h
index 7773ceb..17352fe 100644
--- a/src/shm.h
+++ b/src/shm.h
@@ -19,13 +19,11 @@
 #ifndef SHM_6CHO6D6C
 #define SHM_6CHO6D6C
 
+#include "robust.h"
 #include <atomic>
 #include <boost/interprocess/managed_shared_memory.hpp>
-#include <boost/interprocess/sync/interprocess_condition.hpp>
 #include <boost/interprocess/sync/interprocess_mutex.hpp>
-#include <boost/interprocess/sync/scoped_lock.hpp>
 #include <boost/noncopyable.hpp>
-#include <chrono>
 #include <thread>
 
 namespace bhome_shm
@@ -35,53 +33,65 @@
 
 typedef managed_shared_memory mshm_t;
 
-class CasMutex
-{
-	std::atomic<bool> flag_;
-	bool cas(bool expected, bool new_val) { return flag_.compare_exchange_strong(expected, new_val); }
-
-public:
-	CasMutex() :
-	    flag_(false) {}
-	bool try_lock() { return cas(false, true); }
-	void lock()
-	{
-		while (!try_lock()) { std::this_thread::yield(); }
-	}
-	void unlock() { cas(true, false); }
-};
-
-class MutexWithTimeLimit
+class MutexWithPidCheck
 {
 	typedef boost::interprocess::interprocess_mutex MutexT;
-	// typedef CasMutex MutexT;
-	typedef std::chrono::steady_clock Clock;
-	typedef Clock::duration Duration;
-	static Duration Now() { return Clock::now().time_since_epoch(); }
-
-	const Duration limit_;
-	std::atomic<Duration> last_lock_time_;
+	static pid_t pid()
+	{
+		static pid_t val = getpid();
+		return val;
+	}
+	static bool Killed(pid_t pid)
+	{
+		char buf[64] = {0};
+		snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid);
+		return access(buf, F_OK) != 0;
+	}
+	bool PidCas(pid_t exp, pid_t val) { return pid_.compare_exchange_strong(exp, val); }
 	MutexT mutex_;
+	std::atomic<pid_t> pid_;
 
 public:
 	typedef MutexT::internal_mutex_type internal_mutex_type;
 	const internal_mutex_type &internal_mutex() const { return mutex_.internal_mutex(); }
 	internal_mutex_type &internal_mutex() { return mutex_.internal_mutex(); }
+	MutexWithPidCheck() :
+	    pid_(0) {}
+	bool try_lock()
+	{
+		bool r = false;
+		if (mutex_.try_lock()) {
+			auto old = pid_.load();
+			r = PidCas(old, pid());
+		} else {
+			auto old = pid_.load();
+			if (Killed(old)) {
+				r = PidCas(old, pid());
+				if (r) {
+					printf("PidCheck captured pid %d -> %d\n", old, pid());
+				}
+			}
+		}
+		return r;
+	}
 
-	explicit MutexWithTimeLimit(Duration limit) :
-	    limit_(limit) {}
-	MutexWithTimeLimit() :
-	    MutexWithTimeLimit(std::chrono::seconds(1)) {}
-	~MutexWithTimeLimit() { static_assert(std::is_pod<Duration>::value); }
-	bool try_lock();
-	void lock();
-	void unlock();
+	void lock()
+	{
+		while (!try_lock()) {
+			std::this_thread::yield();
+		}
+	}
+	void unlock()
+	{
+		auto old = pid_.load();
+		if (old == pid()) {
+			mutex_.unlock();
+		}
+	}
 };
 
-// typedef boost::interprocess::interprocess_mutex Mutex;
-typedef MutexWithTimeLimit Mutex;
-typedef scoped_lock<Mutex> Guard;
-typedef interprocess_condition Cond;
+typedef robust::CasMutex<true> Mutex;
+typedef robust::Guard<Mutex> Guard;
 
 class SharedMemory : public mshm_t
 {
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 5dbda96..11f9893 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -21,105 +21,70 @@
 
 #include "shm.h"
 #include <atomic>
-#include <boost/circular_buffer.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
+#include <chrono>
 
 namespace bhome_shm
 {
 
 template <class D>
-using Circular = boost::circular_buffer<D, Allocator<D>>;
+using Circular = robust::CircularBuffer<D, Allocator<D>>;
+
 
 template <class D>
-class SharedQueue : private Circular<D>
+class SharedQueue
 {
-	typedef Circular<D> Super;
-	Mutex mutex_;
-	Cond cond_read_;
-	Cond cond_write_;
-	Mutex &mutex() { return mutex_; }
-
-	static boost::posix_time::ptime MSFromNow(const int ms)
-	{
-		using namespace boost::posix_time;
-		ptime cur = boost::posix_time::microsec_clock::universal_time();
-		return cur + millisec(ms);
-	}
-
-	auto TimedReadPred(const int timeout_ms)
-	{
-		auto endtime = MSFromNow(timeout_ms);
-		return [this, endtime](Guard &lock) {
-			return (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); }));
-		};
-	}
-	auto TryReadPred()
-	{
-		return [this](Guard &lock) { return !this->empty(); };
-	}
-
-	template <class Pred>
-	bool ReadOnCond(D &buf, Pred const &pred)
-	{
-		auto Read = [&]() {
-			Guard lock(this->mutex());
-			if (pred(lock)) {
-				using std::swap;
-				swap(buf, Super::front());
-				Super::pop_front();
-				return true;
-			} else {
-				return false;
-			}
-		};
-		return Read() ? (this->cond_write_.notify_one(), true) : false;
-	}
-
-	template <class Iter, class Pred, class OnWrite>
-	int WriteAllOnCond(Iter begin, Iter end, Pred const &pred, OnWrite const &onWrite)
-	{
-		if (begin == end) { return 0; }
-
-		int n = 0;
-		Guard lock(mutex());
-		while (pred(lock)) {
-			onWrite(*begin);
-			Super::push_back(*begin);
-			++n;
-			cond_read_.notify_one();
-			if (++begin == end) {
-				break;
-			}
-		}
-		return n;
-	}
-
 public:
 	SharedQueue(const uint32_t len, Allocator<D> const &alloc) :
-	    Super(len, alloc) {}
-
-	template <class Iter, class OnWrite>
-	int TryWrite(Iter begin, Iter end, const OnWrite &onWrite)
-	{
-		auto tryWritePred = [this](Guard &lock) { return !this->full(); };
-		return WriteAllOnCond(begin, end, tryWritePred, onWrite);
-	}
+	    queue_(len, alloc) {}
 
 	template <class OnWrite>
-	bool TryWrite(const D &buf, const OnWrite &onWrite) { return TryWrite(&buf, (&buf) + 1, onWrite); }
-
-	bool TryWrite(const D &buf)
+	bool TryWrite(const D &d, const OnWrite &onWrite)
 	{
-		return TryWrite(buf, [](const D &buf) {});
+		Guard lock(mutex());
+		if (!queue_.full()) {
+			onWrite(d);
+			queue_.push_back(d);
+			return true;
+		} else {
+			return false;
+		}
 	}
 
-	template <class OnData>
-	int ReadAll(const int timeout_ms, OnData const &onData) { return ReadAllOnCond(TimedReadPred(timeout_ms), onData); }
-	template <class OnData>
-	int TryReadAll(OnData const &onData) { return ReadAllOnCond(TryReadPred(), onData); }
+	bool TryWrite(const D &d)
+	{
+		Guard lock(mutex());
+		return !queue_.full() ? (queue_.push_back(d), true) : false;
+	}
 
-	bool Read(D &buf, const int timeout_ms) { return ReadOnCond(buf, TimedReadPred(timeout_ms)); }
-	bool TryRead(D &buf) { return ReadOnCond(buf, TryReadPred()); }
+	bool Read(D &d, const int timeout_ms)
+	{
+		using namespace std::chrono;
+		auto end_time = steady_clock::now() + milliseconds(timeout_ms);
+		do {
+			if (TryRead(d)) {
+				return true;
+			} else {
+				robust::QuickSleep();
+			}
+		} while (steady_clock::now() < end_time);
+		return false;
+	}
+	bool TryRead(D &d)
+	{
+		Guard lock(mutex());
+		if (!queue_.empty()) {
+			queue_.pop_front(d);
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+private:
+	typedef Circular<D> Queue;
+	Queue queue_;
+	Mutex mutex_;
+	Mutex &mutex() { return mutex_; }
 };
 
 } // namespace bhome_shm
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 6682aaf..6577b51 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -16,6 +16,7 @@
  * =====================================================================================
  */
 #include "bh_api.h"
+#include "robust.h"
 #include "util.h"
 #include <atomic>
 #include <boost/lockfree/queue.hpp>
@@ -96,138 +97,22 @@
 	// printf("client Recv reply : %s\n", reply.data().c_str());
 }
 
-class TLMutex
-{
-	typedef boost::interprocess::interprocess_mutex MutexT;
-	// typedef CasMutex MutexT;
-	// typedef std::mutex MutexT;
-	typedef std::chrono::steady_clock Clock;
-	typedef Clock::duration Duration;
-	static Duration Now() { return Clock::now().time_since_epoch(); }
-
-	const Duration limit_;
-	std::atomic<Duration> last_lock_time_;
-	MutexT mutex_;
-	bool Expired(const Duration diff) { return diff > limit_; }
-
-public:
-	struct Status {
-		int64_t nlock_ = 0;
-		int64_t nupdate_time_fail = 0;
-		int64_t nfail = 0;
-		int64_t nexcept = 0;
-	};
-	Status st_;
-
-	explicit TLMutex(Duration limit) :
-	    limit_(limit) {}
-	TLMutex() :
-	    TLMutex(std::chrono::seconds(1)) {}
-	~TLMutex() { static_assert(std::is_pod<Duration>::value); }
-	bool try_lock()
-	{
-		if (mutex_.try_lock()) {
-			auto old_time = last_lock_time_.load();
-			auto cur = Now();
-			if (Expired(cur - old_time)) {
-				return last_lock_time_.compare_exchange_strong(old_time, cur);
-			} else {
-				last_lock_time_.store(Now());
-				return true;
-			}
-		} else {
-			auto old_time = last_lock_time_.load();
-			auto cur = Now();
-			if (Expired(cur - old_time)) {
-				return last_lock_time_.compare_exchange_strong(old_time, cur);
-			} else {
-				return false;
-			}
-		}
-	}
-	void lock()
-	{
-		int n = 0;
-		while (!try_lock()) {
-			n++;
-			std::this_thread::yield();
-		}
-		st_.nlock_ += n;
-	}
-	void unlock()
-	{
-		auto old_time = last_lock_time_.load();
-		auto cur = Now();
-		if (!Expired(cur - old_time)) {
-			if (last_lock_time_.compare_exchange_strong(old_time, cur)) {
-				mutex_.unlock();
-			}
-		}
-	}
-};
-
-//robust attr does NOT work, maybe os does not support it.
-class RobustMutex
-{
-public:
-	RobustMutex()
-	{
-		pthread_mutexattr_t mutex_attr;
-		auto attr = [&]() { return &mutex_attr; };
-		int r = pthread_mutexattr_init(attr());
-		r |= pthread_mutexattr_setpshared(attr(), PTHREAD_PROCESS_SHARED);
-		r |= pthread_mutexattr_setrobust_np(attr(), PTHREAD_MUTEX_ROBUST_NP);
-		r |= pthread_mutex_init(mtx(), attr());
-		int rob = 0;
-		pthread_mutexattr_getrobust_np(attr(), &rob);
-		int shared = 0;
-		pthread_mutexattr_getpshared(attr(), &shared);
-		printf("robust : %d, shared : %d\n", rob, shared);
-		r |= pthread_mutexattr_destroy(attr());
-		if (r) {
-			throw("init mutex error.");
-		}
-	}
-	~RobustMutex()
-	{
-		pthread_mutex_destroy(mtx());
-	}
-
-public:
-	void lock() { Lock(); }
-	bool try_lock()
-	{
-		int r = TryLock();
-		printf("TryLock ret: %d\n", r);
-		return r == 0;
-	}
-
-	void unlock() { Unlock(); }
-
-	// private:
-	int TryLock() { return pthread_mutex_trylock(mtx()); }
-	int Lock() { return pthread_mutex_lock(mtx()); }
-	int Unlock() { return pthread_mutex_unlock(mtx()); }
-
-private:
-	pthread_mutex_t *mtx() { return &mutex_; }
-	pthread_mutex_t mutex_;
-};
-
-class LockFreeQueue
-{
-	typedef int64_t Data;
-	typedef boost::lockfree::queue<Data, boost::lockfree::capacity<1024>> LFQueue;
-	void push_back(Data d) { queue_.push(d); }
-
-private:
-	LFQueue queue_;
-};
-
 } // namespace
+#include <chrono>
+using namespace std::chrono;
+// using namespace std::chrono_literals;
 
 BOOST_AUTO_TEST_CASE(MutexTest)
 {
+	// typedef robust::CasMutex<true> RobustMutex;
+	typedef MutexWithPidCheck RobustMutex;
+
+	for (int i = 0; i < 20; ++i) {
+		int size = i;
+		int left = size & 7;
+		int rsize = size + ((8 - left) & 7);
+		printf("size: %3d, rsize: %3d\n", size, rsize);
+	}
 	SharedMemory &shm = TestShm();
 	// shm.Remove();
 	// return;
@@ -235,7 +120,7 @@
 
 	const std::string mtx_name("test_mutex");
 	const std::string int_name("test_int");
-	auto mtx = shm.FindOrCreate<TLMutex>(mtx_name);
+	auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name);
 	auto pi = shm.FindOrCreate<int>(int_name, 100);
 
 	std::mutex m;
@@ -248,11 +133,11 @@
 
 	{
 		boost::timer::auto_cpu_timer timer;
-		printf("test time: ");
-		TLMutex mutex;
-		// CasMutex mutex;
+		const int ntimes = 1000 * 1000;
+		printf("test lock/unlock %d times: ", ntimes);
+		RobustMutex mutex;
 		auto Lock = [&]() {
-			for (int i = 0; i < 10; ++i) {
+			for (int i = 0; i < ntimes; ++i) {
 				mutex.lock();
 				mutex.unlock();
 			}
@@ -260,11 +145,6 @@
 		std::thread t1(Lock), t2(Lock);
 		t1.join();
 		t2.join();
-		printf("mutex nlock: %ld, update time error: %ld, normal fail: %ld, error wait: %ld\n",
-		       mutex.st_.nlock_,
-		       mutex.st_.nupdate_time_fail,
-		       mutex.st_.nfail,
-		       mutex.st_.nexcept);
 	}
 
 	auto MSFromNow = [](const int ms) {
@@ -487,10 +367,13 @@
 	threads.Launch(hb, &run);
 	threads.Launch(showStatus, &run);
 	int ncli = 10;
-	const uint64_t nreq = 1000 * 10;
+	const uint64_t nreq = 1000 * 100;
 	for (int i = 0; i < ncli; ++i) {
 		threads.Launch(asyncRequest, nreq);
 	}
+	// for (int i = 0; i < 100; ++i) {
+	// 	SyncRequest(0);
+	// }
 
 	int same = 0;
 	int64_t last = 0;
@@ -509,4 +392,6 @@
 	threads.WaitAll();
 	auto &st = Status();
 	printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld\n", st.nrequest_.load(), st.nserved_.load(), st.nreply_.load());
+	BHCleanup();
+	printf("after cleanup\n");
 }
\ No newline at end of file
diff --git a/utest/robust_test.cpp b/utest/robust_test.cpp
new file mode 100644
index 0000000..0d54b46
--- /dev/null
+++ b/utest/robust_test.cpp
@@ -0,0 +1,115 @@
+#include "robust.h"
+#include "util.h"
+
+using namespace robust;
+
+typedef CircularBuffer<int64_t, Allocator<int64_t>> Rcb;
+Rcb *GetRCBImpl(SharedMemory &shm, const int nelem)
+{
+	int cap = nelem + 1;
+	typedef uint64_t Data;
+	auto size = sizeof(Rcb) + sizeof(Data) * cap;
+	void *p = shm.Alloc(size);
+	if (p) {
+		return new (p) Rcb(cap, shm.get_segment_manager());
+	}
+	return nullptr;
+}
+Rcb *GetRCB(SharedMemory &shm, const int nelem)
+{
+	void **pStore = shm.FindOrCreate<void *>("test_rcb_pointer", nullptr);
+	if (pStore) {
+		if (!*pStore) {
+			*pStore = GetRCBImpl(shm, nelem);
+		}
+		return (Rcb *) *pStore;
+	}
+	return nullptr;
+}
+
+void MySleep()
+{
+	std::this_thread::sleep_for(2us);
+}
+
+BOOST_AUTO_TEST_CASE(RobustTest)
+{
+	SharedMemory &shm = TestShm();
+	shm.Remove();
+	pid_t pid = getpid();
+	printf("pid : %d\n", pid);
+	auto Access = [](pid_t pid) {
+		char buf[100] = {0};
+		sprintf(buf, "/proc/%d/stat", pid);
+		int r = access(buf, F_OK);
+		printf("access %d\n", r);
+	};
+	Access(pid);
+	Access(pid + 1);
+	// Sleep(10s);
+	// return;
+
+	int nelement = 640;
+	auto rcb = GetRCB(shm, nelement);
+	BOOST_CHECK(rcb != nullptr);
+	BOOST_CHECK(rcb->empty());
+	BOOST_CHECK(rcb->push_back(1));
+	BOOST_CHECK(rcb->size() == 1);
+	int64_t d;
+	BOOST_CHECK(rcb->pop_front(d));
+	BOOST_CHECK(rcb->empty());
+
+	const uint64_t nmsg = 1000 * 1000 * 1;
+	uint64_t correct_total = nmsg * (nmsg - 1) / 2;
+	std::atomic<uint64_t> total(0);
+	std::atomic<uint64_t> nwrite(0);
+	std::atomic<uint64_t> writedone(0);
+	auto Writer = [&]() {
+		uint64_t n = 0;
+		while ((n = nwrite++) < nmsg) {
+			while (!rcb->push_back(n)) {
+				// MySleep();
+			}
+			++writedone;
+		}
+	};
+	std::atomic<uint64_t> nread(0);
+	auto Reader = [&]() {
+		while (nread.load() < nmsg) {
+			int64_t d;
+			if (rcb->pop_front(d)) {
+				++nread;
+				total += d;
+			} else {
+				MySleep();
+			}
+		}
+	};
+
+	auto status = [&]() {
+		auto next = steady_clock::now();
+		uint32_t lw = 0;
+		uint32_t lr = 0;
+		do {
+			std::this_thread::sleep_until(next);
+			next += 1s;
+			auto w = writedone.load();
+			auto r = nread.load();
+			printf("write: %6ld, spd: %6ld,  read: %6ld, spd: %6ld , queue size: %d\n", w, w - lw, r, r - lr, rcb->size());
+			lw = w;
+			lr = r;
+		} while (nread.load() < nmsg);
+	};
+
+	ThreadManager threads;
+	boost::timer::auto_cpu_timer timer;
+	printf("Testing Robust Buffer, msgs %ld, queue size: %d \n", nmsg, nelement);
+	threads.Launch(status);
+	for (int i = 0; i < 10; ++i) {
+		threads.Launch(Reader);
+		threads.Launch(Writer);
+	}
+	threads.WaitAll();
+	printf("total: %ld, expected: %ld\n", total.load(), correct_total);
+	BOOST_CHECK_EQUAL(total.load(), correct_total);
+}
\ No newline at end of file
diff --git a/utest/simple_tests.cpp b/utest/simple_tests.cpp
index 33c78f5..e14a1cd 100644
--- a/utest/simple_tests.cpp
+++ b/utest/simple_tests.cpp
@@ -107,7 +107,7 @@
 BOOST_AUTO_TEST_CASE(TimedWaitTest)
 {
 	SharedMemory &shm = TestShm();
-	MsgI::BindShm(shm);
+	GlobalInit(shm);
 	ShmMsgQueue q(shm, 64);
 	for (int i = 0; i < 2; ++i) {
 		int ms = i * 100;
@@ -123,7 +123,7 @@
 {
 	SharedMemory &shm = TestShm();
 	typedef MsgI Msg;
-	Msg::BindShm(shm);
+	GlobalInit(shm);
 
 	Msg m0(1000);
 	BOOST_CHECK(m0.valid());
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index 302d4bd..bd455ec 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -24,7 +24,7 @@
 {
 	const int mem_size = 1024 * 1024 * 50;
 	SharedMemory &shm = TestShm();
-	MsgI::BindShm(shm);
+	GlobalInit(shm);
 
 	MQId id = ShmMsgQueue::NewId();
 	const int timeout = 1000;
@@ -122,7 +122,7 @@
 	const std::string server_proc_id = "server_proc";
 
 	SharedMemory &shm = TestShm();
-	MsgI::BindShm(shm);
+	GlobalInit(shm);
 
 	auto Avail = [&]() { return shm.get_free_memory(); };
 	auto init_avail = Avail();
diff --git a/utest/utest.cpp b/utest/utest.cpp
index d058471..d8dae45 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -90,7 +90,7 @@
 BOOST_AUTO_TEST_CASE(PubSubTest)
 {
 	SharedMemory &shm = TestShm();
-	MsgI::BindShm(shm);
+	GlobalInit(shm);
 
 	auto Avail = [&]() { return shm.get_free_memory(); };
 	auto init_avail = Avail();
@@ -201,7 +201,7 @@
 BOOST_AUTO_TEST_CASE(ReqRepTest)
 {
 	SharedMemory &shm = TestShm();
-	MsgI::BindShm(shm);
+	GlobalInit(shm);
 
 	auto Avail = [&]() { return shm.get_free_memory(); };
 	auto init_avail = Avail();
diff --git a/utest/util.h b/utest/util.h
index 61e5b11..a4cbbaa 100644
--- a/utest/util.h
+++ b/utest/util.h
@@ -36,6 +36,7 @@
 
 using namespace boost::posix_time;
 using namespace std::chrono_literals;
+using namespace std::chrono;
 
 template <class D>
 inline void Sleep(D d, bool print = true)

--
Gitblit v1.8.0