From 65a230ec6ccb61c3ce6816730da2106f07f40b4a Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 23 四月 2021 18:42:41 +0800
Subject: [PATCH] add api, Unregister, QueryTopicAddress.

---
 box/center.cpp |   36 +++++++++++++++++++++++-------------
 1 files changed, 23 insertions(+), 13 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index 0f547e9..badfbfe 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -166,6 +166,18 @@
 		return HandleMsg<MsgCommonReply, Func>(head, op);
 	}
 
+	MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg)
+	{
+		return HandleMsg(
+		    head, [&](Node node) -> MsgCommonReply {
+			    NodeInfo &ni = *node;
+			    auto now = NowSec(); // just set to offline.
+			    ni.state_.timestamp_ = now - offline_time_;
+			    ni.state_.UpdateState(now, offline_time_, kill_time_);
+			    return MakeReply(eSuccess);
+		    });
+	}
+
 	MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg)
 	{
 		return HandleMsg(
@@ -206,20 +218,17 @@
 		auto query = [&](Node self) -> MsgQueryTopicReply {
 			auto pos = service_map_.find(req.topic());
 			if (pos != service_map_.end() && !pos->second.empty()) {
-				// now just find first one.
-				const TopicDest &dest = *(pos->second.begin());
-				Node dest_node(dest.weak_node_.lock());
-				if (!dest_node) {
-					service_map_.erase(pos);
-					return MakeReply<Reply>(eOffline, "topic server offline.");
-				} else if (!Valid(*dest_node)) {
-					return MakeReply<Reply>(eNoRespond, "topic server not responding.");
-				} else {
-					MsgQueryTopicReply reply = MakeReply<Reply>(eSuccess);
-					reply.mutable_address()->set_mq_id(dest.mq_);
-					return reply;
+				auto &clients = pos->second;
+				Reply reply = MakeReply<Reply>(eSuccess);
+				for (auto &dest : clients) {
+					Node dest_node(dest.weak_node_.lock());
+					if (dest_node && Valid(*dest_node)) {
+						auto node_addr = reply.add_node_address();
+						node_addr->set_proc_id(dest_node->proc_.proc_id());
+						node_addr->mutable_addr()->set_mq_id(dest.mq_);
+					}
 				}
-
+				return reply;
 			} else {
 				return MakeReply<Reply>(eNotFound, "topic server not found.");
 			}
@@ -433,6 +442,7 @@
 		switch (head.type()) {
 			CASE_ON_MSG_TYPE(Register);
 			CASE_ON_MSG_TYPE(Heartbeat);
+			CASE_ON_MSG_TYPE(Unregister);
 
 			CASE_ON_MSG_TYPE(RegisterRPC);
 			CASE_ON_MSG_TYPE(QueryTopic);

--
Gitblit v1.8.0