From 3931f83205f153f2bc7fc36d1a894cdc3f14b4db Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 21 四月 2021 16:52:51 +0800
Subject: [PATCH] change node socket to vector; try lock free queue.
---
proto/source/bhome_msg_api.proto | 4
utest/api_test.cpp | 7 +
utest/lock_free_queue.h | 57 +++++++++++++++++++
src/topic_node.h | 24 +++++---
src/topic_node.cpp | 23 +++----
utest/lock_free_queue.cpp | 29 +++++++++
6 files changed, 118 insertions(+), 26 deletions(-)
diff --git a/proto/source/bhome_msg_api.proto b/proto/source/bhome_msg_api.proto
index 1c7cc1c..fd1ae8f 100644
--- a/proto/source/bhome_msg_api.proto
+++ b/proto/source/bhome_msg_api.proto
@@ -9,8 +9,8 @@
message BHAddress {
bytes mq_id = 1; // mqid, uuid
- // bytes ip = 2; //
- // int32 port = 3;
+ bytes ip = 2; //
+ int32 port = 3;
}
message ProcInfo
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 70247dd..8bf9cf8 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -37,14 +37,16 @@
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
- shm_(shm), sock_node_(shm), sock_client_(shm, kMqLen), sock_server_(shm, kMqLen), sock_sub_(shm, kMqLen), state_(eStateUnregistered)
+ shm_(shm), sockets_(eSockEnd), state_(eStateUnregistered)
{
+ for (int i = eSockStart; i < eSockEnd; ++i) {
+ sockets_[i].reset(new ShmSocket(shm_, kMqLen));
+ }
// recv msgs to avoid memory leak.
auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
- SockNode().Start(default_ignore_msg);
- SockClient().Start(default_ignore_msg);
- SockServer().Start(default_ignore_msg);
- SockSub().Start(default_ignore_msg);
+ for (auto &p : sockets_) {
+ p->Start(default_ignore_msg);
+ }
}
TopicNode::~TopicNode()
@@ -52,10 +54,7 @@
Stop();
SockNode().Stop();
if (state() == eStateUnregistered) {
- SockNode().Remove();
- SockClient().Remove();
- SockServer().Remove();
- SockSub().Remove();
+ for (auto &p : sockets_) { p->Remove(); }
}
}
@@ -66,16 +65,14 @@
} else if (nworker > 16) {
nworker = 16;
}
-
+ SockNode().Start();
ServerStart(server_cb, nworker);
SubscribeStartWorker(sub_cb, nworker);
ClientStartWorker(client_cb, nworker);
}
void TopicNode::Stop()
{
- SockSub().Stop();
- SockServer().Stop();
- SockClient().Stop();
+ for (auto &p : sockets_) { p->Stop(); }
}
bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
diff --git a/src/topic_node.h b/src/topic_node.h
index 8287b4a..b2fae5b 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -22,6 +22,7 @@
#include "socket.h"
#include <atomic>
#include <memory>
+#include <vector>
using namespace bhome_shm;
using namespace bhome_msg;
@@ -107,17 +108,22 @@
};
// some sockets may be the same one, using functions make it easy to change.
+ enum { eSockStart,
+ eSockNode = eSockStart,
+ eSockPub = eSockNode,
+ eSockServer,
+ eSockClient,
+ eSockSub,
+ eSockEnd,
+ };
+ std::vector<std::unique_ptr<ShmSocket>> sockets_;
- ShmSocket &SockNode() { return sock_node_; }
- ShmSocket &SockPub() { return SockNode(); }
- ShmSocket &SockSub() { return sock_sub_; }
- ShmSocket &SockClient() { return sock_client_; }
- ShmSocket &SockServer() { return sock_server_; }
+ ShmSocket &SockNode() { return *sockets_[eSockNode]; }
+ ShmSocket &SockPub() { return *sockets_[eSockPub]; }
+ ShmSocket &SockSub() { return *sockets_[eSockSub]; }
+ ShmSocket &SockClient() { return *sockets_[eSockClient]; }
+ ShmSocket &SockServer() { return *sockets_[eSockServer]; }
- ShmSocket sock_node_;
- ShmSocket sock_client_;
- ShmSocket sock_server_;
- ShmSocket sock_sub_;
enum State {
eStateUnregistered,
eStateOnline,
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index ae3f10a..766c0f8 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -155,7 +155,7 @@
printf("maxsec: %ld\n", CountSeconds(max_time));
bool reg = false;
- for (int i = 0; i < 10 && !reg; ++i) {
+ for (int i = 0; i < 3 && !reg; ++i) {
ProcInfo proc;
proc.set_proc_id("demo_client");
proc.set_public_info("public info of demo_client. etc...");
@@ -167,6 +167,9 @@
BHFree(reply, reply_len);
Sleep(1s);
+ }
+ if (!reg) {
+ return;
}
const std::string topic_ = "topic_";
@@ -204,7 +207,7 @@
for (int i = 0; i < 1; ++i) {
MsgPublish pub;
pub.set_topic(topic_ + std::to_string(i));
- pub.set_data("pub_data_" + std::string(1024 * 1024, 'a'));
+ pub.set_data("pub_data_" + std::string(1024 * 1, 'a'));
std::string s(pub.SerializeAsString());
BHPublish(s.data(), s.size(), 0);
// Sleep(1s);
diff --git a/utest/lock_free_queue.cpp b/utest/lock_free_queue.cpp
new file mode 100644
index 0000000..a05a454
--- /dev/null
+++ b/utest/lock_free_queue.cpp
@@ -0,0 +1,29 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: lock_free_queue.cpp
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�21鏃� 13鏃�57鍒�02绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#include "lock_free_queue.h"
+#include "defs.h"
+#include "util.h"
+
+BOOST_AUTO_TEST_CASE(LockFreeTest)
+{
+ LockFreeQueue q(BHomeShm());
+ for (int i = 0; i < 15; ++i) {
+ int r = q.Write(i);
+ printf("write %d %s\n", i, (r ? "ok" : "failed"));
+ }
+}
\ No newline at end of file
diff --git a/utest/lock_free_queue.h b/utest/lock_free_queue.h
new file mode 100644
index 0000000..968f796
--- /dev/null
+++ b/utest/lock_free_queue.h
@@ -0,0 +1,57 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: lock_free_queue.h
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�21鏃� 14鏃�03鍒�27绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+
+#ifndef LOCK_FREE_QUEUE_KQWP70HT
+#define LOCK_FREE_QUEUE_KQWP70HT
+
+#include "shm.h"
+#include <boost/interprocess/offset_ptr.hpp>
+#include <boost/lockfree/queue.hpp>
+
+using namespace bhome_shm;
+
+typedef int64_t Data;
+const int kQLen = 10;
+class LockFreeQueue : private boost::lockfree::queue<Data,
+ boost::lockfree::allocator<Allocator<Data>>,
+ boost::lockfree::capacity<kQLen>>,
+ private boost::noncopyable
+{
+ typedef boost::lockfree::queue<Data,
+ boost::lockfree::allocator<Allocator<Data>>,
+ boost::lockfree::capacity<kQLen>>
+ Queue;
+
+public:
+ LockFreeQueue(SharedMemory &shm) :
+ Queue(shm.get_segment_manager()) {}
+ bool Read(Data &d) { return pop(d); }
+ bool Write(Data const &d) { return push(d); }
+ template <class Func>
+ bool Write(Data const &d, Func onWrite)
+ {
+ if (Write(d)) {
+ onWrite(d);
+ return true;
+ } else {
+ return false;
+ }
+ }
+};
+
+#endif // end of include guard: LOCK_FREE_QUEUE_KQWP70HT
--
Gitblit v1.8.0