From 34cd75f77d0ca94dbdba4e6cc9451fe4d33e78b3 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 19 五月 2021 19:14:13 +0800 Subject: [PATCH] add api BHQueryProcs. --- src/topic_node.cpp | 126 ++++++++++++++++++++++++++++------------- 1 files changed, 85 insertions(+), 41 deletions(-) diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 43d748f..8bbb929 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -107,6 +107,9 @@ } SetProcIndex(reply.proc_index()); this->state_ = eStateUnregistered; + + ServerStart(ServerAsyncCB(), 1); + SubscribeStartWorker(SubDataCB(), 1); } } break; default: break; @@ -289,6 +292,25 @@ reply.ParseBody(reply_body)); } +bool TopicNode::QueryProcs(BHAddress &dest, MsgQueryProc &query, MsgQueryProcReply &reply_body, const int timeout_ms) +{ + if (!IsOnline()) { + SetLastError(eNotRegistered, kErrMsgNotRegistered); + return false; + } + auto &sock = SockNode(); + + BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn())); + AddRoute(head, sock); + + MsgI reply; + DEFER1(reply.Release()); + BHMsgHead reply_head; + return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) && + reply_head.type() == kMsgTypeQueryProcReply && + reply.ParseBody(reply_body)); +} + bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) { if (!IsOnline()) { @@ -341,26 +363,32 @@ bool TopicNode::ServerStart(const ServerAsyncCB &acb, int nworker) { - auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { - if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } - MsgRequestTopic req; - if (!imsg.ParseBody(req)) { return; } + if (acb) { + auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { + if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } + MsgRequestTopic req; + if (!imsg.ParseBody(req)) { return; } - try { - SrcInfo *p = new SrcInfo; - if (!p) { - throw std::runtime_error("no memory."); + try { + SrcInfo *p = new SrcInfo; + if (!p) { + throw std::runtime_error("no memory."); + } + p->route.assign(head.route().begin(), head.route().end()); + p->msg_id = head.msg_id(); + acb(p, *head.mutable_proc_id(), req); + } catch (std::exception &e) { + LOG_ERROR() << "error server handle msg:" << e.what(); } - p->route.assign(head.route().begin(), head.route().end()); - p->msg_id = head.msg_id(); - acb(p, *head.mutable_proc_id(), req); - } catch (std::exception &e) { - LOG_ERROR() << "error server handle msg:" << e.what(); - } - }; + }; - auto &sock = SockServer(); - return acb && sock.Start(onRecv, nworker); + return SockServer().Start(onRecv, nworker); + } else { + auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { + server_buffer_->Write(std::move(head), msg.body()); + }; + return SockServer().Start(onRequest, nworker); + } } bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) @@ -369,13 +397,19 @@ SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } - - auto &sock = SockServer(); - - MsgI imsg; BHMsgHead head; - if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) { - if (imsg.ParseBody(request)) { + std::string body; + auto end_time = steady_clock::now() + milliseconds(timeout_ms); + while (!server_buffer_->Read(head, body)) { + if (steady_clock::now() < end_time) { + robust::QuickSleep(); + } else { + return false; + } + } + + if (head.type() == kMsgTypeRequestTopic) { + if (request.ParseFromString(body)) { head.mutable_proc_id()->swap(proc_id); try { SrcInfo *p = new SrcInfo; @@ -614,20 +648,24 @@ bool TopicNode::SubscribeStartWorker(const SubDataCB &tdcb, int nworker) { - auto &sock = SockSub(); - - auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) { - if (head.type() == kMsgTypePublish) { - MsgPublish pub; - if (imsg.ParseBody(pub)) { - tdcb(head.proc_id(), pub); + if (tdcb) { + auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) { + if (head.type() == kMsgTypePublish) { + MsgPublish pub; + if (imsg.ParseBody(pub)) { + tdcb(head.proc_id(), pub); + } + } else { + // ignored, or dropped } - } else { - // ignored, or dropped - } - }; - - return tdcb && sock.Start(AsyncRecvProc, nworker); + }; + return SockSub().Start(AsyncRecvProc, nworker); + } else { + auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { + sub_buffer_->Write(std::move(head), msg.body()); + }; + return SockSub().Start(onSub, nworker); + } } bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms) @@ -637,13 +675,19 @@ return false; } - auto &sock = SockSub(); - MsgI msg; - DEFER1(msg.Release();); BHMsgHead head; + std::string body; + auto end_time = steady_clock::now() + milliseconds(timeout_ms); + while (!sub_buffer_->Read(head, body)) { + if (steady_clock::now() < end_time) { + robust::QuickSleep(); + } else { + return false; + } + } //TODO error msg. - if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) { - if (msg.ParseBody(pub)) { + if (head.type() == kMsgTypePublish) { + if (pub.ParseFromString(body)) { head.mutable_proc_id()->swap(proc_id); return true; } -- Gitblit v1.8.0