From e54b8e58780c7d9f37b06cc4e1dc88badb2129c9 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 18 五月 2021 17:02:21 +0800 Subject: [PATCH] remove sync recv, node cache msgs for sync recv. --- src/topic_node.cpp | 112 +++++++++++++++++++++++++++++++++++-------------------- 1 files changed, 71 insertions(+), 41 deletions(-) diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 43d748f..6be65be 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -107,6 +107,14 @@ } SetProcIndex(reply.proc_index()); this->state_ = eStateUnregistered; + auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { + server_buffer_->Write(std::move(head), msg.body()); + }; + SockServer().Start(onRequest); + auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { + sub_buffer_->Write(std::move(head), msg.body()); + }; + SockSub().Start(onSub); } } break; default: break; @@ -341,26 +349,32 @@ bool TopicNode::ServerStart(const ServerAsyncCB &acb, int nworker) { - auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { - if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } - MsgRequestTopic req; - if (!imsg.ParseBody(req)) { return; } + if (acb) { + auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { + if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } + MsgRequestTopic req; + if (!imsg.ParseBody(req)) { return; } - try { - SrcInfo *p = new SrcInfo; - if (!p) { - throw std::runtime_error("no memory."); + try { + SrcInfo *p = new SrcInfo; + if (!p) { + throw std::runtime_error("no memory."); + } + p->route.assign(head.route().begin(), head.route().end()); + p->msg_id = head.msg_id(); + acb(p, *head.mutable_proc_id(), req); + } catch (std::exception &e) { + LOG_ERROR() << "error server handle msg:" << e.what(); } - p->route.assign(head.route().begin(), head.route().end()); - p->msg_id = head.msg_id(); - acb(p, *head.mutable_proc_id(), req); - } catch (std::exception &e) { - LOG_ERROR() << "error server handle msg:" << e.what(); - } - }; + }; - auto &sock = SockServer(); - return acb && sock.Start(onRecv, nworker); + return SockServer().Start(onRecv, nworker); + } else { + auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { + server_buffer_->Write(std::move(head), msg.body()); + }; + return SockServer().Start(onRequest, nworker); + } } bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) @@ -369,13 +383,19 @@ SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } - - auto &sock = SockServer(); - - MsgI imsg; BHMsgHead head; - if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) { - if (imsg.ParseBody(request)) { + std::string body; + auto end_time = steady_clock::now() + milliseconds(timeout_ms); + while (!server_buffer_->Read(head, body)) { + if (steady_clock::now() < end_time) { + robust::QuickSleep(); + } else { + return false; + } + } + + if (head.type() == kMsgTypeRequestTopic) { + if (request.ParseFromString(body)) { head.mutable_proc_id()->swap(proc_id); try { SrcInfo *p = new SrcInfo; @@ -614,20 +634,24 @@ bool TopicNode::SubscribeStartWorker(const SubDataCB &tdcb, int nworker) { - auto &sock = SockSub(); - - auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) { - if (head.type() == kMsgTypePublish) { - MsgPublish pub; - if (imsg.ParseBody(pub)) { - tdcb(head.proc_id(), pub); + if (tdcb) { + auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) { + if (head.type() == kMsgTypePublish) { + MsgPublish pub; + if (imsg.ParseBody(pub)) { + tdcb(head.proc_id(), pub); + } + } else { + // ignored, or dropped } - } else { - // ignored, or dropped - } - }; - - return tdcb && sock.Start(AsyncRecvProc, nworker); + }; + return SockSub().Start(AsyncRecvProc, nworker); + } else { + auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { + sub_buffer_->Write(std::move(head), msg.body()); + }; + return SockSub().Start(onSub, nworker); + } } bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms) @@ -637,13 +661,19 @@ return false; } - auto &sock = SockSub(); - MsgI msg; - DEFER1(msg.Release();); BHMsgHead head; + std::string body; + auto end_time = steady_clock::now() + milliseconds(timeout_ms); + while (!sub_buffer_->Read(head, body)) { + if (steady_clock::now() < end_time) { + robust::QuickSleep(); + } else { + return false; + } + } //TODO error msg. - if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) { - if (msg.ParseBody(pub)) { + if (head.type() == kMsgTypePublish) { + if (pub.ParseFromString(body)) { head.mutable_proc_id()->swap(proc_id); return true; } -- Gitblit v1.8.0