From 3931f83205f153f2bc7fc36d1a894cdc3f14b4db Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 21 四月 2021 16:52:51 +0800
Subject: [PATCH] change node socket to vector; try lock free queue.

---
 proto/source/bhome_msg_api.proto |    4 
 utest/api_test.cpp               |    7 +
 utest/lock_free_queue.h          |   57 +++++++++++++++++++
 src/topic_node.h                 |   24 +++++---
 src/topic_node.cpp               |   23 +++----
 utest/lock_free_queue.cpp        |   29 +++++++++
 6 files changed, 118 insertions(+), 26 deletions(-)

diff --git a/proto/source/bhome_msg_api.proto b/proto/source/bhome_msg_api.proto
index 1c7cc1c..fd1ae8f 100644
--- a/proto/source/bhome_msg_api.proto
+++ b/proto/source/bhome_msg_api.proto
@@ -9,8 +9,8 @@
 
 message BHAddress {
 	bytes mq_id = 1; // mqid, uuid
-	// bytes ip = 2;   //
-	// int32 port = 3;
+	bytes ip = 2;   //
+	int32 port = 3;
 }
 
 message ProcInfo
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 70247dd..8bf9cf8 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -37,14 +37,16 @@
 } // namespace
 
 TopicNode::TopicNode(SharedMemory &shm) :
-    shm_(shm), sock_node_(shm), sock_client_(shm, kMqLen), sock_server_(shm, kMqLen), sock_sub_(shm, kMqLen), state_(eStateUnregistered)
+    shm_(shm), sockets_(eSockEnd), state_(eStateUnregistered)
 {
+	for (int i = eSockStart; i < eSockEnd; ++i) {
+		sockets_[i].reset(new ShmSocket(shm_, kMqLen));
+	}
 	// recv msgs to avoid memory leak.
 	auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
-	SockNode().Start(default_ignore_msg);
-	SockClient().Start(default_ignore_msg);
-	SockServer().Start(default_ignore_msg);
-	SockSub().Start(default_ignore_msg);
+	for (auto &p : sockets_) {
+		p->Start(default_ignore_msg);
+	}
 }
 
 TopicNode::~TopicNode()
@@ -52,10 +54,7 @@
 	Stop();
 	SockNode().Stop();
 	if (state() == eStateUnregistered) {
-		SockNode().Remove();
-		SockClient().Remove();
-		SockServer().Remove();
-		SockSub().Remove();
+		for (auto &p : sockets_) { p->Remove(); }
 	}
 }
 
@@ -66,16 +65,14 @@
 	} else if (nworker > 16) {
 		nworker = 16;
 	}
-
+	SockNode().Start();
 	ServerStart(server_cb, nworker);
 	SubscribeStartWorker(sub_cb, nworker);
 	ClientStartWorker(client_cb, nworker);
 }
 void TopicNode::Stop()
 {
-	SockSub().Stop();
-	SockServer().Stop();
-	SockClient().Stop();
+	for (auto &p : sockets_) { p->Stop(); }
 }
 
 bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
diff --git a/src/topic_node.h b/src/topic_node.h
index 8287b4a..b2fae5b 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -22,6 +22,7 @@
 #include "socket.h"
 #include <atomic>
 #include <memory>
+#include <vector>
 
 using namespace bhome_shm;
 using namespace bhome_msg;
@@ -107,17 +108,22 @@
 	};
 
 	// some sockets may be the same one, using functions make it easy to change.
+	enum { eSockStart,
+		   eSockNode = eSockStart,
+		   eSockPub = eSockNode,
+		   eSockServer,
+		   eSockClient,
+		   eSockSub,
+		   eSockEnd,
+	};
+	std::vector<std::unique_ptr<ShmSocket>> sockets_;
 
-	ShmSocket &SockNode() { return sock_node_; }
-	ShmSocket &SockPub() { return SockNode(); }
-	ShmSocket &SockSub() { return sock_sub_; }
-	ShmSocket &SockClient() { return sock_client_; }
-	ShmSocket &SockServer() { return sock_server_; }
+	ShmSocket &SockNode() { return *sockets_[eSockNode]; }
+	ShmSocket &SockPub() { return *sockets_[eSockPub]; }
+	ShmSocket &SockSub() { return *sockets_[eSockSub]; }
+	ShmSocket &SockClient() { return *sockets_[eSockClient]; }
+	ShmSocket &SockServer() { return *sockets_[eSockServer]; }
 
-	ShmSocket sock_node_;
-	ShmSocket sock_client_;
-	ShmSocket sock_server_;
-	ShmSocket sock_sub_;
 	enum State {
 		eStateUnregistered,
 		eStateOnline,
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index ae3f10a..766c0f8 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -155,7 +155,7 @@
 	printf("maxsec: %ld\n", CountSeconds(max_time));
 
 	bool reg = false;
-	for (int i = 0; i < 10 && !reg; ++i) {
+	for (int i = 0; i < 3 && !reg; ++i) {
 		ProcInfo proc;
 		proc.set_proc_id("demo_client");
 		proc.set_public_info("public info of demo_client. etc...");
@@ -167,6 +167,9 @@
 
 		BHFree(reply, reply_len);
 		Sleep(1s);
+	}
+	if (!reg) {
+		return;
 	}
 
 	const std::string topic_ = "topic_";
@@ -204,7 +207,7 @@
 		for (int i = 0; i < 1; ++i) {
 			MsgPublish pub;
 			pub.set_topic(topic_ + std::to_string(i));
-			pub.set_data("pub_data_" + std::string(1024 * 1024, 'a'));
+			pub.set_data("pub_data_" + std::string(1024 * 1, 'a'));
 			std::string s(pub.SerializeAsString());
 			BHPublish(s.data(), s.size(), 0);
 			// Sleep(1s);
diff --git a/utest/lock_free_queue.cpp b/utest/lock_free_queue.cpp
new file mode 100644
index 0000000..a05a454
--- /dev/null
+++ b/utest/lock_free_queue.cpp
@@ -0,0 +1,29 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  lock_free_queue.cpp
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�21鏃� 13鏃�57鍒�02绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#include "lock_free_queue.h"
+#include "defs.h"
+#include "util.h"
+
+BOOST_AUTO_TEST_CASE(LockFreeTest)
+{
+	LockFreeQueue q(BHomeShm());
+	for (int i = 0; i < 15; ++i) {
+		int r = q.Write(i);
+		printf("write %d %s\n", i, (r ? "ok" : "failed"));
+	}
+}
\ No newline at end of file
diff --git a/utest/lock_free_queue.h b/utest/lock_free_queue.h
new file mode 100644
index 0000000..968f796
--- /dev/null
+++ b/utest/lock_free_queue.h
@@ -0,0 +1,57 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  lock_free_queue.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�21鏃� 14鏃�03鍒�27绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+
+#ifndef LOCK_FREE_QUEUE_KQWP70HT
+#define LOCK_FREE_QUEUE_KQWP70HT
+
+#include "shm.h"
+#include <boost/interprocess/offset_ptr.hpp>
+#include <boost/lockfree/queue.hpp>
+
+using namespace bhome_shm;
+
+typedef int64_t Data;
+const int kQLen = 10;
+class LockFreeQueue : private boost::lockfree::queue<Data,
+                                                     boost::lockfree::allocator<Allocator<Data>>,
+                                                     boost::lockfree::capacity<kQLen>>,
+                      private boost::noncopyable
+{
+	typedef boost::lockfree::queue<Data,
+	                               boost::lockfree::allocator<Allocator<Data>>,
+	                               boost::lockfree::capacity<kQLen>>
+	    Queue;
+
+public:
+	LockFreeQueue(SharedMemory &shm) :
+	    Queue(shm.get_segment_manager()) {}
+	bool Read(Data &d) { return pop(d); }
+	bool Write(Data const &d) { return push(d); }
+	template <class Func>
+	bool Write(Data const &d, Func onWrite)
+	{
+		if (Write(d)) {
+			onWrite(d);
+			return true;
+		} else {
+			return false;
+		}
+	}
+};
+
+#endif // end of include guard: LOCK_FREE_QUEUE_KQWP70HT

--
Gitblit v1.8.0