From 34cd75f77d0ca94dbdba4e6cc9451fe4d33e78b3 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 19 五月 2021 19:14:13 +0800 Subject: [PATCH] add api BHQueryProcs. --- src/topic_node.cpp | 30 ++++++++++++++++++++++-------- 1 files changed, 22 insertions(+), 8 deletions(-) diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 6be65be..8bbb929 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -107,14 +107,9 @@ } SetProcIndex(reply.proc_index()); this->state_ = eStateUnregistered; - auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { - server_buffer_->Write(std::move(head), msg.body()); - }; - SockServer().Start(onRequest); - auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { - sub_buffer_->Write(std::move(head), msg.body()); - }; - SockSub().Start(onSub); + + ServerStart(ServerAsyncCB(), 1); + SubscribeStartWorker(SubDataCB(), 1); } } break; default: break; @@ -297,6 +292,25 @@ reply.ParseBody(reply_body)); } +bool TopicNode::QueryProcs(BHAddress &dest, MsgQueryProc &query, MsgQueryProcReply &reply_body, const int timeout_ms) +{ + if (!IsOnline()) { + SetLastError(eNotRegistered, kErrMsgNotRegistered); + return false; + } + auto &sock = SockNode(); + + BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn())); + AddRoute(head, sock); + + MsgI reply; + DEFER1(reply.Release()); + BHMsgHead reply_head; + return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) && + reply_head.type() == kMsgTypeQueryProcReply && + reply.ParseBody(reply_body)); +} + bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) { if (!IsOnline()) { -- Gitblit v1.8.0