From e54b8e58780c7d9f37b06cc4e1dc88badb2129c9 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 18 五月 2021 17:02:21 +0800
Subject: [PATCH] remove sync recv, node cache msgs for sync recv.
---
utest/speed_test.cpp | 37 +++------
src/socket.h | 4
utest/api_test.cpp | 30 ++++++-
src/msg.h | 11 ++
src/socket.cpp | 19 ----
src/topic_node.h | 26 ++++++
src/topic_node.cpp | 112 +++++++++++++++++----------
7 files changed, 146 insertions(+), 93 deletions(-)
diff --git a/src/msg.h b/src/msg.h
index e8af3c5..42a753e 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -209,6 +209,17 @@
p += 4;
return head.ParseFromArray(p, msg_size);
}
+ std::string body() const
+ {
+ auto p = get<char>();
+ assert(p);
+ uint32_t size = Get32(p);
+ p += 4;
+ p += size;
+ size = Get32(p);
+ p += 4;
+ return std::string(p, size);
+ }
template <class Body>
bool ParseBody(Body &body) const
{
diff --git a/src/socket.cpp b/src/socket.cpp
index c450e65..19be201 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -139,25 +139,6 @@
return false;
}
-bool ShmSocket::SyncRecv(int64_t &cmd, const int timeout_ms)
-{
- return (timeout_ms == 0) ? mq().TryRecv(cmd) : mq().Recv(cmd, timeout_ms);
-}
-//maybe reimplment, using async cbs?
-bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms)
-{
- // std::lock_guard<std::mutex> lock(mutex_); // seems no need to lock mutex_.
- bool got = (timeout_ms == 0) ? mq().TryRecv(msg) : mq().Recv(msg, timeout_ms);
- if (got) {
- if (msg.ParseHead(head)) {
- return true;
- } else {
- msg.Release();
- }
- }
- return false;
-}
-
bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb)
{
size_t size = content.size();
diff --git a/src/socket.h b/src/socket.h
index dea106c..7557034 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -99,8 +99,6 @@
{
return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...);
}
- bool SyncRecv(int64_t &cmd, const int timeout_ms);
- bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
template <class Body>
bool SendAndRecv(const MQInfo &remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
@@ -190,8 +188,8 @@
Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_;
Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_;
-
SendQ send_buffer_;
+
// node request center alloc memory.
int node_proc_index_ = -1;
int socket_index_ = -1;
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 43d748f..6be65be 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -107,6 +107,14 @@
}
SetProcIndex(reply.proc_index());
this->state_ = eStateUnregistered;
+ auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) {
+ server_buffer_->Write(std::move(head), msg.body());
+ };
+ SockServer().Start(onRequest);
+ auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) {
+ sub_buffer_->Write(std::move(head), msg.body());
+ };
+ SockSub().Start(onSub);
}
} break;
default: break;
@@ -341,26 +349,32 @@
bool TopicNode::ServerStart(const ServerAsyncCB &acb, int nworker)
{
- auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
- if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; }
- MsgRequestTopic req;
- if (!imsg.ParseBody(req)) { return; }
+ if (acb) {
+ auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
+ if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; }
+ MsgRequestTopic req;
+ if (!imsg.ParseBody(req)) { return; }
- try {
- SrcInfo *p = new SrcInfo;
- if (!p) {
- throw std::runtime_error("no memory.");
+ try {
+ SrcInfo *p = new SrcInfo;
+ if (!p) {
+ throw std::runtime_error("no memory.");
+ }
+ p->route.assign(head.route().begin(), head.route().end());
+ p->msg_id = head.msg_id();
+ acb(p, *head.mutable_proc_id(), req);
+ } catch (std::exception &e) {
+ LOG_ERROR() << "error server handle msg:" << e.what();
}
- p->route.assign(head.route().begin(), head.route().end());
- p->msg_id = head.msg_id();
- acb(p, *head.mutable_proc_id(), req);
- } catch (std::exception &e) {
- LOG_ERROR() << "error server handle msg:" << e.what();
- }
- };
+ };
- auto &sock = SockServer();
- return acb && sock.Start(onRecv, nworker);
+ return SockServer().Start(onRecv, nworker);
+ } else {
+ auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) {
+ server_buffer_->Write(std::move(head), msg.body());
+ };
+ return SockServer().Start(onRequest, nworker);
+ }
}
bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
@@ -369,13 +383,19 @@
SetLastError(eNotRegistered, kErrMsgNotRegistered);
return false;
}
-
- auto &sock = SockServer();
-
- MsgI imsg;
BHMsgHead head;
- if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) {
- if (imsg.ParseBody(request)) {
+ std::string body;
+ auto end_time = steady_clock::now() + milliseconds(timeout_ms);
+ while (!server_buffer_->Read(head, body)) {
+ if (steady_clock::now() < end_time) {
+ robust::QuickSleep();
+ } else {
+ return false;
+ }
+ }
+
+ if (head.type() == kMsgTypeRequestTopic) {
+ if (request.ParseFromString(body)) {
head.mutable_proc_id()->swap(proc_id);
try {
SrcInfo *p = new SrcInfo;
@@ -614,20 +634,24 @@
bool TopicNode::SubscribeStartWorker(const SubDataCB &tdcb, int nworker)
{
- auto &sock = SockSub();
-
- auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) {
- if (head.type() == kMsgTypePublish) {
- MsgPublish pub;
- if (imsg.ParseBody(pub)) {
- tdcb(head.proc_id(), pub);
+ if (tdcb) {
+ auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) {
+ if (head.type() == kMsgTypePublish) {
+ MsgPublish pub;
+ if (imsg.ParseBody(pub)) {
+ tdcb(head.proc_id(), pub);
+ }
+ } else {
+ // ignored, or dropped
}
- } else {
- // ignored, or dropped
- }
- };
-
- return tdcb && sock.Start(AsyncRecvProc, nworker);
+ };
+ return SockSub().Start(AsyncRecvProc, nworker);
+ } else {
+ auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) {
+ sub_buffer_->Write(std::move(head), msg.body());
+ };
+ return SockSub().Start(onSub, nworker);
+ }
}
bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms)
@@ -637,13 +661,19 @@
return false;
}
- auto &sock = SockSub();
- MsgI msg;
- DEFER1(msg.Release(););
BHMsgHead head;
+ std::string body;
+ auto end_time = steady_clock::now() + milliseconds(timeout_ms);
+ while (!sub_buffer_->Read(head, body)) {
+ if (steady_clock::now() < end_time) {
+ robust::QuickSleep();
+ } else {
+ return false;
+ }
+ }
//TODO error msg.
- if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) {
- if (msg.ParseBody(pub)) {
+ if (head.type() == kMsgTypePublish) {
+ if (pub.ParseFromString(body)) {
head.mutable_proc_id()->swap(proc_id);
return true;
}
diff --git a/src/topic_node.h b/src/topic_node.h
index 1dfbf43..81bf718 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -163,6 +163,32 @@
int proc_index_ = -1;
TopicQueryCache topic_query_cache_;
+
+ class RecvQ
+ {
+ public:
+ void Write(BHMsgHead &&head, std::string &&body) { q_.push_back({std::move(head), std::move(body)}); }
+ bool Read(BHMsgHead &head, std::string &body)
+ {
+ if (q_.empty()) {
+ return false;
+ } else {
+ head = std::move(q_.front().head);
+ body = std::move(q_.front().body);
+ q_.pop_front();
+ return true;
+ }
+ }
+
+ private:
+ struct MsgData {
+ BHMsgHead head;
+ std::string body;
+ };
+ std::deque<MsgData> q_;
+ };
+ Synced<RecvQ> server_buffer_;
+ Synced<RecvQ> sub_buffer_;
};
#endif // end of include guard: TOPIC_NODE_YVKWA6TF
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 44c809d..533c399 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -131,6 +131,15 @@
reg = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000);
if (reg) {
printf("register ok\n");
+ // bool r = BHUnregister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000);
+ // printf("unregister %s\n", r ? "ok" : "failed");
+ // reg = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000);
+ // if (!reg) {
+ // int ec = 0;
+ // std::string msg;
+ // GetLastError(ec, msg);
+ // printf("reg error: %s\n", msg.c_str());
+ // }
} else {
int ec = 0;
std::string msg;
@@ -201,7 +210,7 @@
auto SyncRequest = [&](int idx) { // SyncRequest
MsgRequestTopic req;
- req.set_topic(topic_ + std::to_string(idx));
+ req.set_topic(topic_ + std::to_string(0));
req.set_data("request_data_" + std::to_string(idx));
std::string s(req.SerializeAsString());
// Sleep(10ms, false);
@@ -286,19 +295,30 @@
std::atomic<bool> run(true);
- BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
ThreadManager threads;
+
+#if 0
+ BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
+#else
+ BHStartWorker(FServerCallback(), &SubRecvProc, &ClientProc);
+ threads.Launch(ServerLoop, &run);
+#endif
+
boost::timer::auto_cpu_timer timer;
threads.Launch(hb, &run);
threads.Launch(showStatus, &run);
int ncli = 10;
const int64_t nreq = 1000 * 100;
+
+#if 1
for (int i = 0; i < ncli; ++i) {
threads.Launch(asyncRequest, nreq);
}
- // for (int i = 0; i < 100; ++i) {
- // SyncRequest(0);
- // }
+#else
+ for (int i = 0; i < 100; ++i) {
+ SyncRequest(i);
+ }
+#endif
int same = 0;
uint64_t last = 0;
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index 4dea623..ef56678 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -189,36 +189,24 @@
Req();
}
};
+ auto onRequest = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
+ if (head.type() == kMsgTypeRequestTopic) {
+ MQInfo src_mq = {head.route()[0].mq_id(), head.route()[0].abs_addr()};
- std::atomic<bool> stop(false);
- auto Server = [&]() {
- MsgI req;
- BHMsgHead req_head;
-
- while (!stop) {
- if (srv.SyncRecv(req, req_head, 10)) {
- DEFER1(req.Release());
-
- if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
- MQInfo src_mq = {req_head.route()[0].mq_id(), req_head.route()[0].abs_addr()};
- auto Reply = [&]() {
- MsgRequestTopic reply_body;
- reply_body.set_topic("topic");
- reply_body.set_data(msg_content);
- auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id()));
- return srv.Send(src_mq, reply_head, reply_body);
- };
- Reply();
- }
- }
+ MsgRequestTopic reply_body;
+ reply_body.set_topic("topic");
+ reply_body.set_data(msg_content);
+ auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), head.msg_id()));
+ srv.Send(src_mq, reply_head, reply_body);
}
};
+ srv.Start(onRequest);
boost::timer::auto_cpu_timer timer;
DEFER1(printf("Request Reply Test:"););
- ThreadManager clients, servers;
- for (int i = 0; i < 2; ++i) { servers.Launch(Server); }
+ ThreadManager clients;
+
printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
clients.WaitAll();
@@ -227,7 +215,6 @@
std::this_thread::sleep_for(100ms);
} while (count.load() < ncli * nmsg);
PrintStatus(NowSec());
- stop = true;
- servers.WaitAll();
+ srv.Stop();
// BOOST_CHECK_THROW(reply.Count(), int);
}
--
Gitblit v1.8.0