From b3b9d91eccd3f54be112ac5389b49969fea93b4c Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 21 四月 2021 13:22:55 +0800 Subject: [PATCH] trivial. --- src/topic_node.cpp | 41 ++++++++++++++++++++++++++++++++--------- 1 files changed, 32 insertions(+), 9 deletions(-) diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 2cc5483..bbf88eb 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -40,6 +40,9 @@ // recv msgs to avoid memory leak. auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; SockNode().Start(default_ignore_msg); + SockClient().Start(default_ignore_msg); + SockServer().Start(default_ignore_msg); + SockSub().Start(default_ignore_msg); } TopicNode::~TopicNode() @@ -54,7 +57,7 @@ } } -void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) +void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) { if (nworker < 1) { nworker = 1; @@ -178,7 +181,7 @@ } } -bool TopicNode::ServerStart(const ServerCB &rcb, int nworker) +bool TopicNode::ServerStart(const ServerSyncCB &rcb, int nworker) { auto onRecv = [this, rcb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } @@ -199,6 +202,23 @@ auto &sock = SockServer(); return rcb && sock.Start(onRecv, nworker); +} + +bool TopicNode::ServerStart(const ServerAsyncCB &acb, int nworker) +{ + auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { + if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } + MsgRequestTopic req; + if (!imsg.ParseBody(req)) { return; } + + SrcInfo *p = new SrcInfo; + p->route.assign(head.route().begin(), head.route().end()); + p->msg_id = head.msg_id(); + acb(p, *head.mutable_proc_id(), req); + }; + + auto &sock = SockServer(); + return acb && sock.Start(onRecv, nworker); } bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) @@ -296,13 +316,15 @@ }; try { - auto &sock = SockClient(); BHAddress addr; - - if (topic_query_cache_.Find(req.topic(), addr)) { +#if 1 + return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(addr, req, cb); +#else + if (topic_query_cache_.Pick(req.topic(), addr)) { return SendTo(addr, req, cb); } + auto &sock = SockClient(); MsgQueryTopic query; query.set_topic(req.topic()); BHMsgHead head(InitMsgHead(GetType(query), proc_id())); @@ -313,12 +335,13 @@ if (head.type() == kMsgTypeQueryTopicReply && imsg.ParseBody(rep)) { auto &addr = rep.address(); if (!addr.mq_id().empty()) { - topic_query_cache_.Update(req.topic(), addr); + topic_query_cache_.Store(req.topic(), addr); SendTo(addr, req, cb); } } }; - return sock.Send(&BHTopicCenterAddress(), head, query, onQueryResult); + return sock.Send(&BHTopicCenterAddress(), head, query, std::move(onQueryResult)); +#endif } catch (...) { SetLastError(eError, "internal error."); @@ -361,7 +384,7 @@ return false; } -bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms) +bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms) { if (!IsRegistered()) { SetLastError(eNotRegistered, "Not Registered."); @@ -391,7 +414,7 @@ if (addr.mq_id().empty()) { return false; } else { - topic_query_cache_.Update(topic, addr); + topic_query_cache_.Store(topic, addr); return true; } } -- Gitblit v1.8.0