From 1b167ec5ad101ac44451381e26cc73ab5d67d2a1 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 26 四月 2021 16:37:52 +0800
Subject: [PATCH] fix socket busy loop; del locked readall; refactor.

---
 src/socket.h            |    4 
 src/shm_alloc_queue.cpp |   19 +
 box/center.cpp          |    8 
 src/shm_msg_queue.h     |   77 +++++++
 src/socket.cpp          |    5 
 src/defs.h              |    1 
 src/sendq.cpp           |   10 
 src/shm_queue.cpp       |   87 --------
 src/topic_node.cpp      |    2 
 src/shm_msg_queue.cpp   |  105 ++++++++++
 src/defs.cpp            |    8 
 box/center_main.cc      |    8 
 src/shm_alloc_queue.h   |   23 ++
 utest/api_test.cpp      |  102 ++++++---
 src/shm_queue.h         |   88 +-------
 src/sendq.h             |    9 
 16 files changed, 334 insertions(+), 222 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index 829a089..5cb9bc3 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -133,12 +133,12 @@
 				UpdateRegInfo(node);
 				nodes_[ssn] = node;
 
-				printf("new ssn %ld\n", ssn);
+				printf("new node (%s) ssn (%ld)\n", head.proc_id().c_str(), ssn);
 				auto old = online_node_addr_map_.find(head.proc_id());
 				if (old != online_node_addr_map_.end()) { // old session
 					auto &old_ssn = old->second;
 					nodes_[old_ssn]->state_.PutOffline(offline_time_);
-					printf("put %s %ld offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second);
+					printf("put node (%s) ssn (%ld) offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second);
 					old_ssn = ssn;
 				} else {
 					online_node_addr_map_.emplace(head.proc_id(), ssn);
@@ -201,6 +201,10 @@
 			    for (auto &topic : topics) {
 				    service_map_[topic].insert(dest);
 			    }
+			    printf("node %s ssn %ld serve %d topics:\n", node->proc_.proc_id().c_str(), *node->addrs_.begin(), topics.size());
+			    for (auto &topic : topics) {
+				    printf("\t %s\n", topic.c_str());
+			    }
 			    return MakeReply(eSuccess);
 		    });
 	}
diff --git a/box/center_main.cc b/box/center_main.cc
index fdda2cd..79210fc 100644
--- a/box/center_main.cc
+++ b/box/center_main.cc
@@ -85,15 +85,15 @@
 } // namespace
 int center_main(int argc, const char *argv[])
 {
-	auto &shm = BHomeShm();
-	GlobalInit(shm);
-
 	AppArg args(argc, argv);
 	if (args.Has("remove")) {
-		shm.Remove();
+		SharedMemory::Remove(BHomeShmName());
 		return 0;
 	}
 
+	auto &shm = BHomeShm();
+	GlobalInit(shm);
+
 	InstanceFlag inst(shm, kCenterRunningFlag);
 	if (!inst.TryStartAsFirstInstance()) {
 		printf("another instance is running, exit.\n");
diff --git a/src/defs.cpp b/src/defs.cpp
index 0ca82bf..cc6f23b 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -17,7 +17,7 @@
  */
 #include "defs.h"
 #include "msg.h"
-#include "shm_queue.h"
+#include "shm_msg_queue.h"
 
 namespace
 {
@@ -35,9 +35,13 @@
 
 } // namespace
 
+std::string BHomeShmName()
+{
+	return "bhome_default_shm_v0";
+}
 bhome_shm::SharedMemory &BHomeShm()
 {
-	static bhome_shm::SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512);
+	static bhome_shm::SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512);
 	return shm;
 }
 
diff --git a/src/defs.h b/src/defs.h
index 1c9e663..43375bf 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -37,6 +37,7 @@
 class SharedMemory;
 } // namespace bhome_shm
 
+std::string BHomeShmName();
 bhome_shm::SharedMemory &BHomeShm();
 bool GlobalInit(bhome_shm::SharedMemory &shm);
 typedef std::string Topic;
diff --git a/src/sendq.cpp b/src/sendq.cpp
index 5b57d72..c0d5afd 100644
--- a/src/sendq.cpp
+++ b/src/sendq.cpp
@@ -16,10 +16,12 @@
  * =====================================================================================
  */
 #include "sendq.h"
-#include "shm_queue.h"
+#include "shm_msg_queue.h"
 #include <chrono>
 
-int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr)
+using namespace bhome_shm;
+
+int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr)
 {
 	auto FirstNotExpired = [](Array &l) {
 		auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; };
@@ -65,7 +67,7 @@
 	return nprocessed;
 }
 
-int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &al)
+int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &al)
 {
 	int nsend = 0;
 	auto AllSent = [&](Array &arr) {
@@ -76,7 +78,7 @@
 	return nsend;
 }
 
-bool SendQ::TrySend(bhome_shm::ShmMsgQueue &mq)
+bool SendQ::TrySend(ShmMsgQueue &mq)
 {
 	std::unique_lock<std::mutex> lock(mutex_out_);
 	size_t nsend = 0;
diff --git a/src/sendq.h b/src/sendq.h
index 0699df7..0e565d5 100644
--- a/src/sendq.h
+++ b/src/sendq.h
@@ -29,10 +29,7 @@
 #include <string>
 #include <unordered_map>
 
-namespace bhome_shm
-{
 class ShmMsgQueue;
-} // namespace bhome_shm
 
 class SendQ
 {
@@ -65,7 +62,7 @@
 	{
 		AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire);
 	}
-	bool TrySend(bhome_shm::ShmMsgQueue &mq);
+	bool TrySend(ShmMsgQueue &mq);
 	// bool empty() const { return store_.empty(); }
 
 private:
@@ -88,8 +85,8 @@
 	typedef std::list<Array> ArrayList;
 	typedef std::unordered_map<Remote, ArrayList> Store;
 
-	int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr);
-	int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &arr);
+	int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr);
+	int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr);
 
 	std::mutex mutex_in_;
 	std::mutex mutex_out_;
diff --git a/src/shm_alloc_queue.cpp b/src/shm_alloc_queue.cpp
new file mode 100644
index 0000000..7ea5213
--- /dev/null
+++ b/src/shm_alloc_queue.cpp
@@ -0,0 +1,19 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  shm_alloc_queue.cpp
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�26鏃� 16鏃�24鍒�25绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#include "shm_alloc_queue.h"
+
diff --git a/src/shm_alloc_queue.h b/src/shm_alloc_queue.h
new file mode 100644
index 0000000..73a184f
--- /dev/null
+++ b/src/shm_alloc_queue.h
@@ -0,0 +1,23 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  shm_alloc_queue.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�26鏃� 16鏃�24鍒�40绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#ifndef SHM_ALLOC_QUEUE_EQBLM9FZ
+#define SHM_ALLOC_QUEUE_EQBLM9FZ
+
+
+
+#endif // end of include guard: SHM_ALLOC_QUEUE_EQBLM9FZ
diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp
new file mode 100644
index 0000000..ae019bf
--- /dev/null
+++ b/src/shm_msg_queue.cpp
@@ -0,0 +1,105 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  shm_msg_queue.cpp
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�26鏃� 16鏃�25鍒�05绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#include "shm_msg_queue.h"
+
+using namespace bhome_msg;
+using namespace boost::interprocess;
+
+namespace
+{
+std::string MsgQIdToName(const ShmMsgQueue::MQId id)
+{
+	char buf[40] = "mqOx";
+	int n = sprintf(buf + 4, "%lx", id);
+	return std::string(buf, n + 4);
+}
+
+const int AdjustMQLength(const int len)
+{
+	const int kMaxLength = 10000;
+	const int kDefaultLen = 12;
+	if (len <= 0) {
+		return kDefaultLen;
+	} else if (len < kMaxLength) {
+		return len;
+	} else {
+		return kMaxLength;
+	}
+}
+
+} // namespace
+
+ShmMsgQueue::MQId ShmMsgQueue::NewId()
+{
+	static auto &id = GetData();
+	return ++id;
+}
+// ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
+ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
+    id_(id),
+    queue_(segment, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager())
+{
+}
+
+ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
+    id_(NewId()),
+    queue_(segment, true, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager())
+{
+	if (!queue_.IsOk()) {
+		throw("error create msgq " + std::to_string(id_));
+	}
+}
+
+ShmMsgQueue::~ShmMsgQueue() {}
+
+bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id)
+{
+	Queue *q = Find(shm, id);
+	if (q) {
+		MsgI msg;
+		while (q->TryRead(msg)) {
+			msg.Release();
+		}
+	}
+	return Shmq::Remove(shm, MsgQIdToName(id));
+}
+
+ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId remote_id)
+{
+	return Shmq::Find(shm, MsgQIdToName(remote_id));
+}
+
+bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend)
+{
+	Queue *remote = Find(shm, remote_id);
+	if (remote) {
+		if (onsend) {
+			return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
+		} else {
+			return remote->TryWrite(msg, [](const MsgI &msg) { msg.AddRef(); });
+		}
+	} else {
+		// SetLestError(eNotFound);
+		return false;
+	}
+}
+
+// Test shows that in the 2 cases:
+// 1) build msg first, then find remote queue;
+// 2) find remote queue first, then build msg;
+// 1 is about 50% faster than 2, maybe cache related.
diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h
new file mode 100644
index 0000000..d7b33af
--- /dev/null
+++ b/src/shm_msg_queue.h
@@ -0,0 +1,77 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  shm_msg_queue.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�26鏃� 16鏃�25鍒�21绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#ifndef SHM_MSG_QUEUE_D847TQXH
+#define SHM_MSG_QUEUE_D847TQXH
+
+#include "msg.h"
+#include "shm_queue.h"
+
+using namespace bhome_shm;
+using namespace bhome_msg;
+
+class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
+{
+	typedef ShmObject<SharedQueue<MsgI>> Shmq;
+	typedef Shmq::ShmType ShmType;
+	typedef Shmq::Data Queue;
+	typedef std::function<void()> OnSend;
+
+public:
+	typedef uint64_t MQId;
+
+	static MQId NewId();
+
+	ShmMsgQueue(const MQId id, ShmType &segment, const int len);
+	ShmMsgQueue(ShmType &segment, const int len);
+	~ShmMsgQueue();
+	static bool Remove(SharedMemory &shm, const MQId id);
+	MQId Id() const { return id_; }
+	ShmType &shm() const { return queue_.shm(); }
+
+	bool Recv(MsgI &msg, const int timeout_ms) { return queue_.data()->Read(msg, timeout_ms); }
+	bool TryRecv(MsgI &msg) { return queue_.data()->TryRead(msg); }
+	static Queue *Find(SharedMemory &shm, const MQId remote_id);
+	static bool TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend = OnSend());
+	template <class Iter>
+	static int TrySendAll(SharedMemory &shm, const MQId remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend())
+	{
+		Queue *remote = Find(shm, remote_id);
+		if (remote) {
+			if (onsend) {
+				return remote->TryWrite(begin, end, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
+			} else {
+				return remote->TryWrite(begin, end, [](const MsgI &msg) { msg.AddRef(); });
+			}
+		} else {
+			// SetLestError(eNotFound);
+			return 0;
+		}
+	}
+
+	template <class... Rest>
+	bool TrySend(const MQId remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); }
+	template <class... Rest>
+	int TrySendAll(const MQId remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); }
+
+private:
+	MQId id_;
+	Shmq &queue() { return queue_; }
+	Shmq queue_;
+};
+
+#endif // end of include guard: SHM_MSG_QUEUE_D847TQXH
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index face18b..86f0d91 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -21,91 +21,4 @@
 
 namespace bhome_shm
 {
-using namespace bhome_msg;
-using namespace boost::interprocess;
-
-namespace
-{
-std::string MsgQIdToName(const ShmMsgQueue::MQId id)
-{
-	char buf[40] = "mqOx";
-	int n = sprintf(buf + 4, "%lx", id);
-	return std::string(buf, n + 4);
-}
-
-const int AdjustMQLength(const int len)
-{
-	const int kMaxLength = 10000;
-	const int kDefaultLen = 12;
-	if (len <= 0) {
-		return kDefaultLen;
-	} else if (len < kMaxLength) {
-		return len;
-	} else {
-		return kMaxLength;
-	}
-}
-
-} // namespace
-
-ShmMsgQueue::MQId ShmMsgQueue::NewId()
-{
-	static auto &id = GetData();
-	return ++id;
-}
-// ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
-ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
-    id_(id),
-    queue_(segment, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager())
-{
-}
-
-ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
-    id_(NewId()),
-    queue_(segment, true, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager())
-{
-	if (!queue_.IsOk()) {
-		throw("error create msgq " + std::to_string(id_));
-	}
-}
-
-ShmMsgQueue::~ShmMsgQueue() {}
-
-bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id)
-{
-	Queue *q = Find(shm, id);
-	if (q) {
-		MsgI msg;
-		while (q->TryRead(msg)) {
-			msg.Release();
-		}
-	}
-	return Shmq::Remove(shm, MsgQIdToName(id));
-}
-
-ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId remote_id)
-{
-	return Shmq::Find(shm, MsgQIdToName(remote_id));
-}
-
-bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend)
-{
-	Queue *remote = Find(shm, remote_id);
-	if (remote) {
-		if (onsend) {
-			return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
-		} else {
-			return remote->TryWrite(msg, [](const MsgI &msg) { msg.AddRef(); });
-		}
-	} else {
-		// SetLestError(eNotFound);
-		return false;
-	}
-}
-
-// Test shows that in the 2 cases:
-// 1) build msg first, then find remote queue;
-// 2) find remote queue first, then build msg;
-// 1 is about 50% faster than 2, maybe cache related.
-
 } // namespace bhome_shm
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 4f576a7..5dbda96 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -19,7 +19,6 @@
 #ifndef SHM_QUEUE_JE0OEUP3
 #define SHM_QUEUE_JE0OEUP3
 
-#include "msg.h"
 #include "shm.h"
 #include <atomic>
 #include <boost/circular_buffer.hpp>
@@ -59,30 +58,21 @@
 		return [this](Guard &lock) { return !this->empty(); };
 	}
 
-	template <class Pred, class OnData>
-	int ReadAllOnCond(Pred const &pred, OnData const &onData)
-	{
-		Guard lock(this->mutex());
-		int n = 0;
-		while (pred(lock)) {
-			++n;
-			onData(this->front());
-			this->pop_front();
-			this->cond_write_.notify_one();
-		}
-		return n;
-	}
-
 	template <class Pred>
 	bool ReadOnCond(D &buf, Pred const &pred)
 	{
-		int flag = 0;
-		auto only_once = [&](Guard &lock) { return flag++ == 0 && pred(lock); };
-		auto onData = [&buf](D &d) {
-			using std::swap;
-			swap(buf, d);
+		auto Read = [&]() {
+			Guard lock(this->mutex());
+			if (pred(lock)) {
+				using std::swap;
+				swap(buf, Super::front());
+				Super::pop_front();
+				return true;
+			} else {
+				return false;
+			}
 		};
-		return ReadAllOnCond(only_once, onData);
+		return Read() ? (this->cond_write_.notify_one(), true) : false;
 	}
 
 	template <class Iter, class Pred, class OnWrite>
@@ -94,7 +84,7 @@
 		Guard lock(mutex());
 		while (pred(lock)) {
 			onWrite(*begin);
-			this->push_back(*begin);
+			Super::push_back(*begin);
 			++n;
 			cond_read_.notify_one();
 			if (++begin == end) {
@@ -130,60 +120,6 @@
 
 	bool Read(D &buf, const int timeout_ms) { return ReadOnCond(buf, TimedReadPred(timeout_ms)); }
 	bool TryRead(D &buf) { return ReadOnCond(buf, TryReadPred()); }
-};
-
-using namespace bhome_msg;
-
-class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
-{
-	typedef ShmObject<SharedQueue<MsgI>> Shmq;
-	typedef Shmq::ShmType ShmType;
-	typedef Shmq::Data Queue;
-	typedef std::function<void()> OnSend;
-
-public:
-	typedef uint64_t MQId;
-
-	static MQId NewId();
-
-	ShmMsgQueue(const MQId id, ShmType &segment, const int len);
-	ShmMsgQueue(ShmType &segment, const int len);
-	~ShmMsgQueue();
-	static bool Remove(SharedMemory &shm, const MQId id);
-	MQId Id() const { return id_; }
-	ShmType &shm() const { return queue_.shm(); }
-
-	bool Recv(MsgI &msg, const int timeout_ms) { return queue_.data()->Read(msg, timeout_ms); }
-	bool TryRecv(MsgI &msg) { return queue_.data()->TryRead(msg); }
-	template <class OnData>
-	int TryRecvAll(OnData const &onData) { return queue_.data()->TryReadAll(onData); }
-	static Queue *Find(SharedMemory &shm, const MQId remote_id);
-	static bool TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend = OnSend());
-	template <class Iter>
-	static int TrySendAll(SharedMemory &shm, const MQId remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend())
-	{
-		Queue *remote = Find(shm, remote_id);
-		if (remote) {
-			if (onsend) {
-				return remote->TryWrite(begin, end, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
-			} else {
-				return remote->TryWrite(begin, end, [](const MsgI &msg) { msg.AddRef(); });
-			}
-		} else {
-			// SetLestError(eNotFound);
-			return 0;
-		}
-	}
-
-	template <class... Rest>
-	bool TrySend(const MQId remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); }
-	template <class... Rest>
-	int TrySendAll(const MQId remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); }
-
-private:
-	MQId id_;
-	Shmq &queue() { return queue_; }
-	Shmq queue_;
 };
 
 } // namespace bhome_shm
diff --git a/src/socket.cpp b/src/socket.cpp
index e471633..2127260 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -65,7 +65,8 @@
 					onRecvWithPerMsgCB(*this, imsg, head);
 				}
 			};
-			return mq().TryRecvAll(onMsg) > 0; // this will recv all msgs.
+			MsgI imsg;
+			return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false;
 		};
 
 		try {
@@ -74,6 +75,8 @@
 			if (onIdle) { onIdle(*this); }
 			if (!more_to_send && !more_to_recv) {
 				std::this_thread::yield();
+				using namespace std::chrono_literals;
+				std::this_thread::sleep_for(10000ns);
 			}
 		} catch (...) {
 		}
diff --git a/src/socket.h b/src/socket.h
index cd6bfee..a5dd72c 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -22,7 +22,7 @@
 #include "bh_util.h"
 #include "defs.h"
 #include "sendq.h"
-#include "shm_queue.h"
+#include "shm_msg_queue.h"
 #include <atomic>
 #include <boost/noncopyable.hpp>
 #include <condition_variable>
@@ -37,7 +37,7 @@
 {
 
 protected:
-	typedef bhome_shm::ShmMsgQueue Queue;
+	typedef ShmMsgQueue Queue;
 
 public:
 	typedef ShmMsgQueue::MQId MQId;
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 1131816..d274c4b 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -389,7 +389,7 @@
 
 		BHAddress addr;
 		if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
-			printf("node: %ld, topic dest: %ld\n", SockNode().id(), addr.mq_id());
+			// printf("node: %ld, topic dest: %ld\n", SockNode().id(), addr.mq_id());
 			BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
 			AddRoute(head, sock.id());
 			head.set_topic(request.topic());
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index debe8ad..6682aaf 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -18,6 +18,7 @@
 #include "bh_api.h"
 #include "util.h"
 #include <atomic>
+#include <boost/lockfree/queue.hpp>
 
 using namespace bhome_msg;
 
@@ -49,7 +50,6 @@
 	static MsgStatus st;
 	return st;
 }
-} // namespace
 
 void SubRecvProc(const void *proc_id,
                  const int proc_id_len,
@@ -59,7 +59,7 @@
 	std::string proc((const char *) proc_id, proc_id_len);
 	MsgPublish pub;
 	pub.ParseFromArray(data, data_len);
-	// printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str());
+	printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str());
 }
 
 void ServerProc(const void *proc_id,
@@ -98,8 +98,8 @@
 
 class TLMutex
 {
-	// typedef boost::interprocess::interprocess_mutex MutexT;
-	typedef CasMutex MutexT;
+	typedef boost::interprocess::interprocess_mutex MutexT;
+	// typedef CasMutex MutexT;
 	// typedef std::mutex MutexT;
 	typedef std::chrono::steady_clock Clock;
 	typedef Clock::duration Duration;
@@ -108,6 +108,7 @@
 	const Duration limit_;
 	std::atomic<Duration> last_lock_time_;
 	MutexT mutex_;
+	bool Expired(const Duration diff) { return diff > limit_; }
 
 public:
 	struct Status {
@@ -127,16 +128,18 @@
 	{
 		if (mutex_.try_lock()) {
 			auto old_time = last_lock_time_.load();
-			if (Now() - old_time > limit_) {
-				return last_lock_time_.compare_exchange_strong(old_time, Now());
+			auto cur = Now();
+			if (Expired(cur - old_time)) {
+				return last_lock_time_.compare_exchange_strong(old_time, cur);
 			} else {
 				last_lock_time_.store(Now());
 				return true;
 			}
 		} else {
 			auto old_time = last_lock_time_.load();
-			if (Now() - old_time > limit_) {
-				return last_lock_time_.compare_exchange_strong(old_time, Now());
+			auto cur = Now();
+			if (Expired(cur - old_time)) {
+				return last_lock_time_.compare_exchange_strong(old_time, cur);
 			} else {
 				return false;
 			}
@@ -154,55 +157,88 @@
 	void unlock()
 	{
 		auto old_time = last_lock_time_.load();
-		if (Now() - old_time > limit_) {
-		} else {
-			if (last_lock_time_.compare_exchange_strong(old_time, Now())) {
+		auto cur = Now();
+		if (!Expired(cur - old_time)) {
+			if (last_lock_time_.compare_exchange_strong(old_time, cur)) {
 				mutex_.unlock();
 			}
 		}
 	}
 };
 
-namespace
-{
-typedef int64_t Offset;
-Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); }
-void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); }
-} // namespace
-
+//robust attr does NOT work, maybe os does not support it.
 class RobustMutex
 {
 public:
 	RobustMutex()
 	{
-		pthread_mutexattr_t attr;
-		pthread_mutexattr_init(&attr);
-		pthread_mutexattr_setrobust(&attr, 1);
-		pthread_mutex_init(mtx(), &attr);
-		if (!valid()) {
+		pthread_mutexattr_t mutex_attr;
+		auto attr = [&]() { return &mutex_attr; };
+		int r = pthread_mutexattr_init(attr());
+		r |= pthread_mutexattr_setpshared(attr(), PTHREAD_PROCESS_SHARED);
+		r |= pthread_mutexattr_setrobust_np(attr(), PTHREAD_MUTEX_ROBUST_NP);
+		r |= pthread_mutex_init(mtx(), attr());
+		int rob = 0;
+		pthread_mutexattr_getrobust_np(attr(), &rob);
+		int shared = 0;
+		pthread_mutexattr_getpshared(attr(), &shared);
+		printf("robust : %d, shared : %d\n", rob, shared);
+		r |= pthread_mutexattr_destroy(attr());
+		if (r) {
 			throw("init mutex error.");
 		}
 	}
+	~RobustMutex()
+	{
+		pthread_mutex_destroy(mtx());
+	}
+
+public:
+	void lock() { Lock(); }
+	bool try_lock()
+	{
+		int r = TryLock();
+		printf("TryLock ret: %d\n", r);
+		return r == 0;
+	}
+
+	void unlock() { Unlock(); }
+
+	// private:
 	int TryLock() { return pthread_mutex_trylock(mtx()); }
 	int Lock() { return pthread_mutex_lock(mtx()); }
 	int Unlock() { return pthread_mutex_unlock(mtx()); }
-	bool valid() const { return false; }
 
 private:
 	pthread_mutex_t *mtx() { return &mutex_; }
 	pthread_mutex_t mutex_;
 };
 
+class LockFreeQueue
+{
+	typedef int64_t Data;
+	typedef boost::lockfree::queue<Data, boost::lockfree::capacity<1024>> LFQueue;
+	void push_back(Data d) { queue_.push(d); }
+
+private:
+	LFQueue queue_;
+};
+
+} // namespace
+
 BOOST_AUTO_TEST_CASE(MutexTest)
 {
 	SharedMemory &shm = TestShm();
+	// shm.Remove();
+	// return;
 	GlobalInit(shm);
 
 	const std::string mtx_name("test_mutex");
 	const std::string int_name("test_int");
-	auto mtx = shm.FindOrCreate<Mutex>(mtx_name);
+	auto mtx = shm.FindOrCreate<TLMutex>(mtx_name);
 	auto pi = shm.FindOrCreate<int>(int_name, 100);
 
+	std::mutex m;
 	typedef std::chrono::steady_clock Clock;
 	auto Now = []() { return Clock::now().time_since_epoch(); };
 	if (pi) {
@@ -334,7 +370,6 @@
 		printf("subscribe topic : %s\n", r ? "ok" : "failed");
 	}
 
-	// BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
 	auto ServerLoop = [&](std::atomic<bool> *run) {
 		while (*run) {
 			void *proc_id = 0;
@@ -446,27 +481,20 @@
 
 	std::atomic<bool> run(true);
 
+	BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
 	ThreadManager threads;
 	boost::timer::auto_cpu_timer timer;
 	threads.Launch(hb, &run);
-	threads.Launch(ServerLoop, &run);
 	threads.Launch(showStatus, &run);
 	int ncli = 10;
-	const uint64_t nreq = 1000 * 1;
+	const uint64_t nreq = 1000 * 10;
 	for (int i = 0; i < ncli; ++i) {
-		// threads.Launch(asyncRequest, nreq);
+		threads.Launch(asyncRequest, nreq);
 	}
-
-	for (int i = 0; i < 10; ++i) {
-		SyncRequest(i);
-	}
-	// run.store(false);
-	// server_thread.join();
-	// return;
 
 	int same = 0;
 	int64_t last = 0;
-	while (last < nreq * ncli && same < 1) {
+	while (last < nreq * ncli && same < 2) {
 		Sleep(1s, false);
 		auto cur = Status().nreply_.load();
 		if (last == cur) {

--
Gitblit v1.8.0