From 1b167ec5ad101ac44451381e26cc73ab5d67d2a1 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 26 四月 2021 16:37:52 +0800
Subject: [PATCH] fix socket busy loop; del locked readall; refactor.

---
 utest/api_test.cpp |  102 ++++++++++++++++++++++++++++++++------------------
 1 files changed, 65 insertions(+), 37 deletions(-)

diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index debe8ad..6682aaf 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -18,6 +18,7 @@
 #include "bh_api.h"
 #include "util.h"
 #include <atomic>
+#include <boost/lockfree/queue.hpp>
 
 using namespace bhome_msg;
 
@@ -49,7 +50,6 @@
 	static MsgStatus st;
 	return st;
 }
-} // namespace
 
 void SubRecvProc(const void *proc_id,
                  const int proc_id_len,
@@ -59,7 +59,7 @@
 	std::string proc((const char *) proc_id, proc_id_len);
 	MsgPublish pub;
 	pub.ParseFromArray(data, data_len);
-	// printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str());
+	printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str());
 }
 
 void ServerProc(const void *proc_id,
@@ -98,8 +98,8 @@
 
 class TLMutex
 {
-	// typedef boost::interprocess::interprocess_mutex MutexT;
-	typedef CasMutex MutexT;
+	typedef boost::interprocess::interprocess_mutex MutexT;
+	// typedef CasMutex MutexT;
 	// typedef std::mutex MutexT;
 	typedef std::chrono::steady_clock Clock;
 	typedef Clock::duration Duration;
@@ -108,6 +108,7 @@
 	const Duration limit_;
 	std::atomic<Duration> last_lock_time_;
 	MutexT mutex_;
+	bool Expired(const Duration diff) { return diff > limit_; }
 
 public:
 	struct Status {
@@ -127,16 +128,18 @@
 	{
 		if (mutex_.try_lock()) {
 			auto old_time = last_lock_time_.load();
-			if (Now() - old_time > limit_) {
-				return last_lock_time_.compare_exchange_strong(old_time, Now());
+			auto cur = Now();
+			if (Expired(cur - old_time)) {
+				return last_lock_time_.compare_exchange_strong(old_time, cur);
 			} else {
 				last_lock_time_.store(Now());
 				return true;
 			}
 		} else {
 			auto old_time = last_lock_time_.load();
-			if (Now() - old_time > limit_) {
-				return last_lock_time_.compare_exchange_strong(old_time, Now());
+			auto cur = Now();
+			if (Expired(cur - old_time)) {
+				return last_lock_time_.compare_exchange_strong(old_time, cur);
 			} else {
 				return false;
 			}
@@ -154,55 +157,88 @@
 	void unlock()
 	{
 		auto old_time = last_lock_time_.load();
-		if (Now() - old_time > limit_) {
-		} else {
-			if (last_lock_time_.compare_exchange_strong(old_time, Now())) {
+		auto cur = Now();
+		if (!Expired(cur - old_time)) {
+			if (last_lock_time_.compare_exchange_strong(old_time, cur)) {
 				mutex_.unlock();
 			}
 		}
 	}
 };
 
-namespace
-{
-typedef int64_t Offset;
-Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); }
-void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); }
-} // namespace
-
+//robust attr does NOT work, maybe os does not support it.
 class RobustMutex
 {
 public:
 	RobustMutex()
 	{
-		pthread_mutexattr_t attr;
-		pthread_mutexattr_init(&attr);
-		pthread_mutexattr_setrobust(&attr, 1);
-		pthread_mutex_init(mtx(), &attr);
-		if (!valid()) {
+		pthread_mutexattr_t mutex_attr;
+		auto attr = [&]() { return &mutex_attr; };
+		int r = pthread_mutexattr_init(attr());
+		r |= pthread_mutexattr_setpshared(attr(), PTHREAD_PROCESS_SHARED);
+		r |= pthread_mutexattr_setrobust_np(attr(), PTHREAD_MUTEX_ROBUST_NP);
+		r |= pthread_mutex_init(mtx(), attr());
+		int rob = 0;
+		pthread_mutexattr_getrobust_np(attr(), &rob);
+		int shared = 0;
+		pthread_mutexattr_getpshared(attr(), &shared);
+		printf("robust : %d, shared : %d\n", rob, shared);
+		r |= pthread_mutexattr_destroy(attr());
+		if (r) {
 			throw("init mutex error.");
 		}
 	}
+	~RobustMutex()
+	{
+		pthread_mutex_destroy(mtx());
+	}
+
+public:
+	void lock() { Lock(); }
+	bool try_lock()
+	{
+		int r = TryLock();
+		printf("TryLock ret: %d\n", r);
+		return r == 0;
+	}
+
+	void unlock() { Unlock(); }
+
+	// private:
 	int TryLock() { return pthread_mutex_trylock(mtx()); }
 	int Lock() { return pthread_mutex_lock(mtx()); }
 	int Unlock() { return pthread_mutex_unlock(mtx()); }
-	bool valid() const { return false; }
 
 private:
 	pthread_mutex_t *mtx() { return &mutex_; }
 	pthread_mutex_t mutex_;
 };
 
+class LockFreeQueue
+{
+	typedef int64_t Data;
+	typedef boost::lockfree::queue<Data, boost::lockfree::capacity<1024>> LFQueue;
+	void push_back(Data d) { queue_.push(d); }
+
+private:
+	LFQueue queue_;
+};
+
+} // namespace
+
 BOOST_AUTO_TEST_CASE(MutexTest)
 {
 	SharedMemory &shm = TestShm();
+	// shm.Remove();
+	// return;
 	GlobalInit(shm);
 
 	const std::string mtx_name("test_mutex");
 	const std::string int_name("test_int");
-	auto mtx = shm.FindOrCreate<Mutex>(mtx_name);
+	auto mtx = shm.FindOrCreate<TLMutex>(mtx_name);
 	auto pi = shm.FindOrCreate<int>(int_name, 100);
 
+	std::mutex m;
 	typedef std::chrono::steady_clock Clock;
 	auto Now = []() { return Clock::now().time_since_epoch(); };
 	if (pi) {
@@ -334,7 +370,6 @@
 		printf("subscribe topic : %s\n", r ? "ok" : "failed");
 	}
 
-	// BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
 	auto ServerLoop = [&](std::atomic<bool> *run) {
 		while (*run) {
 			void *proc_id = 0;
@@ -446,27 +481,20 @@
 
 	std::atomic<bool> run(true);
 
+	BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
 	ThreadManager threads;
 	boost::timer::auto_cpu_timer timer;
 	threads.Launch(hb, &run);
-	threads.Launch(ServerLoop, &run);
 	threads.Launch(showStatus, &run);
 	int ncli = 10;
-	const uint64_t nreq = 1000 * 1;
+	const uint64_t nreq = 1000 * 10;
 	for (int i = 0; i < ncli; ++i) {
-		// threads.Launch(asyncRequest, nreq);
+		threads.Launch(asyncRequest, nreq);
 	}
-
-	for (int i = 0; i < 10; ++i) {
-		SyncRequest(i);
-	}
-	// run.store(false);
-	// server_thread.join();
-	// return;
 
 	int same = 0;
 	int64_t last = 0;
-	while (last < nreq * ncli && same < 1) {
+	while (last < nreq * ncli && same < 2) {
 		Sleep(1s, false);
 		auto cur = Status().nreply_.load();
 		if (last == cur) {

--
Gitblit v1.8.0