From e54b8e58780c7d9f37b06cc4e1dc88badb2129c9 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 18 五月 2021 17:02:21 +0800 Subject: [PATCH] remove sync recv, node cache msgs for sync recv. --- utest/speed_test.cpp | 37 +++------ src/socket.h | 4 utest/api_test.cpp | 30 ++++++- src/msg.h | 11 ++ src/socket.cpp | 19 ---- src/topic_node.h | 26 ++++++ src/topic_node.cpp | 112 +++++++++++++++++---------- 7 files changed, 146 insertions(+), 93 deletions(-) diff --git a/src/msg.h b/src/msg.h index e8af3c5..42a753e 100644 --- a/src/msg.h +++ b/src/msg.h @@ -209,6 +209,17 @@ p += 4; return head.ParseFromArray(p, msg_size); } + std::string body() const + { + auto p = get<char>(); + assert(p); + uint32_t size = Get32(p); + p += 4; + p += size; + size = Get32(p); + p += 4; + return std::string(p, size); + } template <class Body> bool ParseBody(Body &body) const { diff --git a/src/socket.cpp b/src/socket.cpp index c450e65..19be201 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -139,25 +139,6 @@ return false; } -bool ShmSocket::SyncRecv(int64_t &cmd, const int timeout_ms) -{ - return (timeout_ms == 0) ? mq().TryRecv(cmd) : mq().Recv(cmd, timeout_ms); -} -//maybe reimplment, using async cbs? -bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms) -{ - // std::lock_guard<std::mutex> lock(mutex_); // seems no need to lock mutex_. - bool got = (timeout_ms == 0) ? mq().TryRecv(msg) : mq().Recv(msg, timeout_ms); - if (got) { - if (msg.ParseHead(head)) { - return true; - } else { - msg.Release(); - } - } - return false; -} - bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb) { size_t size = content.size(); diff --git a/src/socket.h b/src/socket.h index dea106c..7557034 100644 --- a/src/socket.h +++ b/src/socket.h @@ -99,8 +99,6 @@ { return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...); } - bool SyncRecv(int64_t &cmd, const int timeout_ms); - bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms); template <class Body> bool SendAndRecv(const MQInfo &remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) @@ -190,8 +188,8 @@ Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_; Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_; - SendQ send_buffer_; + // node request center alloc memory. int node_proc_index_ = -1; int socket_index_ = -1; diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 43d748f..6be65be 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -107,6 +107,14 @@ } SetProcIndex(reply.proc_index()); this->state_ = eStateUnregistered; + auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { + server_buffer_->Write(std::move(head), msg.body()); + }; + SockServer().Start(onRequest); + auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { + sub_buffer_->Write(std::move(head), msg.body()); + }; + SockSub().Start(onSub); } } break; default: break; @@ -341,26 +349,32 @@ bool TopicNode::ServerStart(const ServerAsyncCB &acb, int nworker) { - auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { - if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } - MsgRequestTopic req; - if (!imsg.ParseBody(req)) { return; } + if (acb) { + auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { + if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } + MsgRequestTopic req; + if (!imsg.ParseBody(req)) { return; } - try { - SrcInfo *p = new SrcInfo; - if (!p) { - throw std::runtime_error("no memory."); + try { + SrcInfo *p = new SrcInfo; + if (!p) { + throw std::runtime_error("no memory."); + } + p->route.assign(head.route().begin(), head.route().end()); + p->msg_id = head.msg_id(); + acb(p, *head.mutable_proc_id(), req); + } catch (std::exception &e) { + LOG_ERROR() << "error server handle msg:" << e.what(); } - p->route.assign(head.route().begin(), head.route().end()); - p->msg_id = head.msg_id(); - acb(p, *head.mutable_proc_id(), req); - } catch (std::exception &e) { - LOG_ERROR() << "error server handle msg:" << e.what(); - } - }; + }; - auto &sock = SockServer(); - return acb && sock.Start(onRecv, nworker); + return SockServer().Start(onRecv, nworker); + } else { + auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { + server_buffer_->Write(std::move(head), msg.body()); + }; + return SockServer().Start(onRequest, nworker); + } } bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) @@ -369,13 +383,19 @@ SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } - - auto &sock = SockServer(); - - MsgI imsg; BHMsgHead head; - if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) { - if (imsg.ParseBody(request)) { + std::string body; + auto end_time = steady_clock::now() + milliseconds(timeout_ms); + while (!server_buffer_->Read(head, body)) { + if (steady_clock::now() < end_time) { + robust::QuickSleep(); + } else { + return false; + } + } + + if (head.type() == kMsgTypeRequestTopic) { + if (request.ParseFromString(body)) { head.mutable_proc_id()->swap(proc_id); try { SrcInfo *p = new SrcInfo; @@ -614,20 +634,24 @@ bool TopicNode::SubscribeStartWorker(const SubDataCB &tdcb, int nworker) { - auto &sock = SockSub(); - - auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) { - if (head.type() == kMsgTypePublish) { - MsgPublish pub; - if (imsg.ParseBody(pub)) { - tdcb(head.proc_id(), pub); + if (tdcb) { + auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) { + if (head.type() == kMsgTypePublish) { + MsgPublish pub; + if (imsg.ParseBody(pub)) { + tdcb(head.proc_id(), pub); + } + } else { + // ignored, or dropped } - } else { - // ignored, or dropped - } - }; - - return tdcb && sock.Start(AsyncRecvProc, nworker); + }; + return SockSub().Start(AsyncRecvProc, nworker); + } else { + auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { + sub_buffer_->Write(std::move(head), msg.body()); + }; + return SockSub().Start(onSub, nworker); + } } bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms) @@ -637,13 +661,19 @@ return false; } - auto &sock = SockSub(); - MsgI msg; - DEFER1(msg.Release();); BHMsgHead head; + std::string body; + auto end_time = steady_clock::now() + milliseconds(timeout_ms); + while (!sub_buffer_->Read(head, body)) { + if (steady_clock::now() < end_time) { + robust::QuickSleep(); + } else { + return false; + } + } //TODO error msg. - if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) { - if (msg.ParseBody(pub)) { + if (head.type() == kMsgTypePublish) { + if (pub.ParseFromString(body)) { head.mutable_proc_id()->swap(proc_id); return true; } diff --git a/src/topic_node.h b/src/topic_node.h index 1dfbf43..81bf718 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -163,6 +163,32 @@ int proc_index_ = -1; TopicQueryCache topic_query_cache_; + + class RecvQ + { + public: + void Write(BHMsgHead &&head, std::string &&body) { q_.push_back({std::move(head), std::move(body)}); } + bool Read(BHMsgHead &head, std::string &body) + { + if (q_.empty()) { + return false; + } else { + head = std::move(q_.front().head); + body = std::move(q_.front().body); + q_.pop_front(); + return true; + } + } + + private: + struct MsgData { + BHMsgHead head; + std::string body; + }; + std::deque<MsgData> q_; + }; + Synced<RecvQ> server_buffer_; + Synced<RecvQ> sub_buffer_; }; #endif // end of include guard: TOPIC_NODE_YVKWA6TF diff --git a/utest/api_test.cpp b/utest/api_test.cpp index 44c809d..533c399 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -131,6 +131,15 @@ reg = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000); if (reg) { printf("register ok\n"); + // bool r = BHUnregister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000); + // printf("unregister %s\n", r ? "ok" : "failed"); + // reg = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000); + // if (!reg) { + // int ec = 0; + // std::string msg; + // GetLastError(ec, msg); + // printf("reg error: %s\n", msg.c_str()); + // } } else { int ec = 0; std::string msg; @@ -201,7 +210,7 @@ auto SyncRequest = [&](int idx) { // SyncRequest MsgRequestTopic req; - req.set_topic(topic_ + std::to_string(idx)); + req.set_topic(topic_ + std::to_string(0)); req.set_data("request_data_" + std::to_string(idx)); std::string s(req.SerializeAsString()); // Sleep(10ms, false); @@ -286,19 +295,30 @@ std::atomic<bool> run(true); - BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc); ThreadManager threads; + +#if 0 + BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc); +#else + BHStartWorker(FServerCallback(), &SubRecvProc, &ClientProc); + threads.Launch(ServerLoop, &run); +#endif + boost::timer::auto_cpu_timer timer; threads.Launch(hb, &run); threads.Launch(showStatus, &run); int ncli = 10; const int64_t nreq = 1000 * 100; + +#if 1 for (int i = 0; i < ncli; ++i) { threads.Launch(asyncRequest, nreq); } - // for (int i = 0; i < 100; ++i) { - // SyncRequest(0); - // } +#else + for (int i = 0; i < 100; ++i) { + SyncRequest(i); + } +#endif int same = 0; uint64_t last = 0; diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp index 4dea623..ef56678 100644 --- a/utest/speed_test.cpp +++ b/utest/speed_test.cpp @@ -189,36 +189,24 @@ Req(); } }; + auto onRequest = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) { + if (head.type() == kMsgTypeRequestTopic) { + MQInfo src_mq = {head.route()[0].mq_id(), head.route()[0].abs_addr()}; - std::atomic<bool> stop(false); - auto Server = [&]() { - MsgI req; - BHMsgHead req_head; - - while (!stop) { - if (srv.SyncRecv(req, req_head, 10)) { - DEFER1(req.Release()); - - if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) { - MQInfo src_mq = {req_head.route()[0].mq_id(), req_head.route()[0].abs_addr()}; - auto Reply = [&]() { - MsgRequestTopic reply_body; - reply_body.set_topic("topic"); - reply_body.set_data(msg_content); - auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id())); - return srv.Send(src_mq, reply_head, reply_body); - }; - Reply(); - } - } + MsgRequestTopic reply_body; + reply_body.set_topic("topic"); + reply_body.set_data(msg_content); + auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), head.msg_id())); + srv.Send(src_mq, reply_head, reply_body); } }; + srv.Start(onRequest); boost::timer::auto_cpu_timer timer; DEFER1(printf("Request Reply Test:");); - ThreadManager clients, servers; - for (int i = 0; i < 2; ++i) { servers.Launch(Server); } + ThreadManager clients; + printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg); for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); } clients.WaitAll(); @@ -227,7 +215,6 @@ std::this_thread::sleep_for(100ms); } while (count.load() < ncli * nmsg); PrintStatus(NowSec()); - stop = true; - servers.WaitAll(); + srv.Stop(); // BOOST_CHECK_THROW(reply.Count(), int); } -- Gitblit v1.8.0