From 68c7bef33e74f23aa0136ccd6f7faa654d671ebc Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 21 五月 2021 09:23:01 +0800 Subject: [PATCH] center publish notify; fix topic partial match. --- src/topic_node.cpp | 40 +++++++++++++++++++++++++++------------- 1 files changed, 27 insertions(+), 13 deletions(-) diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 6be65be..6ed7713 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -17,6 +17,7 @@ */ #include "topic_node.h" #include "bh_util.h" +#include "sleeper.h" #include <chrono> #include <list> @@ -51,7 +52,6 @@ TopicNode::~TopicNode() { - LOG_DEBUG() << "~TopicNode()"; Stop(); } @@ -107,14 +107,9 @@ } SetProcIndex(reply.proc_index()); this->state_ = eStateUnregistered; - auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { - server_buffer_->Write(std::move(head), msg.body()); - }; - SockServer().Start(onRequest); - auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { - sub_buffer_->Write(std::move(head), msg.body()); - }; - SockSub().Start(onSub); + + ServerStart(ServerAsyncCB(), 1); + SubscribeStartWorker(SubDataCB(), 1); } } break; default: break; @@ -145,9 +140,7 @@ } void TopicNode::Stop() { - LOG_DEBUG() << "Node Stopping"; for (auto &p : sockets_) { p->Stop(); } - LOG_INFO() << "Node Stopped"; } bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) @@ -297,6 +290,25 @@ reply.ParseBody(reply_body)); } +bool TopicNode::QueryProcs(BHAddress &dest, MsgQueryProc &query, MsgQueryProcReply &reply_body, const int timeout_ms) +{ + if (!IsOnline()) { + SetLastError(eNotRegistered, kErrMsgNotRegistered); + return false; + } + auto &sock = SockNode(); + + BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn())); + AddRoute(head, sock); + + MsgI reply; + DEFER1(reply.Release()); + BHMsgHead reply_head; + return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) && + reply_head.type() == kMsgTypeQueryProcReply && + reply.ParseBody(reply_body)); +} + bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) { if (!IsOnline()) { @@ -385,10 +397,11 @@ } BHMsgHead head; std::string body; + FibUSleeper sleeper(1000 * 10); auto end_time = steady_clock::now() + milliseconds(timeout_ms); while (!server_buffer_->Read(head, body)) { if (steady_clock::now() < end_time) { - robust::QuickSleep(); + sleeper.Sleep(); } else { return false; } @@ -663,10 +676,11 @@ BHMsgHead head; std::string body; + FibUSleeper sleeper(1000 * 10); auto end_time = steady_clock::now() + milliseconds(timeout_ms); while (!sub_buffer_->Read(head, body)) { if (steady_clock::now() < end_time) { - robust::QuickSleep(); + sleeper.Sleep(); } else { return false; } -- Gitblit v1.8.0