From 1f3729698a131b3f701f67adb6a1258aa1235dce Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 20 四月 2021 15:43:53 +0800 Subject: [PATCH] api server callback change tag to src; refactor. --- src/topic_node.cpp | 36 ++++++++++++++++++++++++++++-------- 1 files changed, 28 insertions(+), 8 deletions(-) diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 9853f35..16c565b 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -54,7 +54,7 @@ } } -void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) +void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) { if (nworker < 1) { nworker = 1; @@ -178,7 +178,7 @@ } } -bool TopicNode::ServerStart(const ServerCB &rcb, int nworker) +bool TopicNode::ServerStart(const ServerSyncCB &rcb, int nworker) { auto onRecv = [this, rcb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } @@ -199,6 +199,23 @@ auto &sock = SockServer(); return rcb && sock.Start(onRecv, nworker); +} + +bool TopicNode::ServerStart(const ServerAsyncCB &acb, int nworker) +{ + auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { + if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } + MsgRequestTopic req; + if (!imsg.ParseBody(req)) { return; } + + SrcInfo *p = new SrcInfo; + p->route.assign(head.route().begin(), head.route().end()); + p->msg_id = head.msg_id(); + acb(p, *head.mutable_proc_id(), req); + }; + + auto &sock = SockServer(); + return acb && sock.Start(onRecv, nworker); } bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) @@ -296,13 +313,15 @@ }; try { - auto &sock = SockClient(); BHAddress addr; - - if (topic_query_cache_.Find(req.topic(), addr)) { +#if 1 + return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(addr, req, cb); +#else + if (topic_query_cache_.Pick(req.topic(), addr)) { return SendTo(addr, req, cb); } + auto &sock = SockClient(); MsgQueryTopic query; query.set_topic(req.topic()); BHMsgHead head(InitMsgHead(GetType(query), proc_id())); @@ -313,12 +332,13 @@ if (head.type() == kMsgTypeQueryTopicReply && imsg.ParseBody(rep)) { auto &addr = rep.address(); if (!addr.mq_id().empty()) { - topic_query_cache_.Update(req.topic(), addr); + topic_query_cache_.Store(req.topic(), addr); SendTo(addr, req, cb); } } }; - return sock.Send(&BHTopicCenterAddress(), head, query, onQueryResult); + return sock.Send(&BHTopicCenterAddress(), head, query, std::move(onQueryResult)); +#endif } catch (...) { SetLastError(eError, "internal error."); @@ -391,7 +411,7 @@ if (addr.mq_id().empty()) { return false; } else { - topic_query_cache_.Update(topic, addr); + topic_query_cache_.Store(topic, addr); return true; } } -- Gitblit v1.8.0