From 65a230ec6ccb61c3ce6816730da2106f07f40b4a Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 23 四月 2021 18:42:41 +0800 Subject: [PATCH] add api, Unregister, QueryTopicAddress. --- box/center.cpp | 36 +++++++++++++++++++++++------------- 1 files changed, 23 insertions(+), 13 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index 0f547e9..badfbfe 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -166,6 +166,18 @@ return HandleMsg<MsgCommonReply, Func>(head, op); } + MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg) + { + return HandleMsg( + head, [&](Node node) -> MsgCommonReply { + NodeInfo &ni = *node; + auto now = NowSec(); // just set to offline. + ni.state_.timestamp_ = now - offline_time_; + ni.state_.UpdateState(now, offline_time_, kill_time_); + return MakeReply(eSuccess); + }); + } + MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg) { return HandleMsg( @@ -206,20 +218,17 @@ auto query = [&](Node self) -> MsgQueryTopicReply { auto pos = service_map_.find(req.topic()); if (pos != service_map_.end() && !pos->second.empty()) { - // now just find first one. - const TopicDest &dest = *(pos->second.begin()); - Node dest_node(dest.weak_node_.lock()); - if (!dest_node) { - service_map_.erase(pos); - return MakeReply<Reply>(eOffline, "topic server offline."); - } else if (!Valid(*dest_node)) { - return MakeReply<Reply>(eNoRespond, "topic server not responding."); - } else { - MsgQueryTopicReply reply = MakeReply<Reply>(eSuccess); - reply.mutable_address()->set_mq_id(dest.mq_); - return reply; + auto &clients = pos->second; + Reply reply = MakeReply<Reply>(eSuccess); + for (auto &dest : clients) { + Node dest_node(dest.weak_node_.lock()); + if (dest_node && Valid(*dest_node)) { + auto node_addr = reply.add_node_address(); + node_addr->set_proc_id(dest_node->proc_.proc_id()); + node_addr->mutable_addr()->set_mq_id(dest.mq_); + } } - + return reply; } else { return MakeReply<Reply>(eNotFound, "topic server not found."); } @@ -433,6 +442,7 @@ switch (head.type()) { CASE_ON_MSG_TYPE(Register); CASE_ON_MSG_TYPE(Heartbeat); + CASE_ON_MSG_TYPE(Unregister); CASE_ON_MSG_TYPE(RegisterRPC); CASE_ON_MSG_TYPE(QueryTopic); -- Gitblit v1.8.0