From 5b6ced44157b6e7fab519ae48f5cffcdc2b3cd7c Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期日, 25 四月 2021 19:28:57 +0800
Subject: [PATCH] use node mqid ssn id to index online nodes.

---
 box/center.cpp |   42 ++++++++++++++++++++++++++++--------------
 1 files changed, 28 insertions(+), 14 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index d920ff7..4bb9ea1 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -52,6 +52,11 @@
 	struct ProcState {
 		int64_t timestamp_ = 0;
 		uint32_t flag_ = 0; // reserved
+		void PutOffline(const int64_t offline_time)
+		{
+			timestamp_ = NowSec() - offline_time;
+			flag_ = kStateOffline;
+		}
 		void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
 		{
 			auto diff = now - timestamp_;
@@ -106,6 +111,10 @@
 		}
 
 		try {
+			MQId ssn = head.ssn_id();
+			// use src_addr as session id.
+			// when node restart, src_addr will change,
+			// and old node will be removed after timeout.
 			auto UpdateRegInfo = [&](Node &node) {
 				node->addrs_.insert(SrcAddr(head));
 				for (auto &addr : msg.addrs()) {
@@ -116,19 +125,24 @@
 				node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
 			};
 
-			auto pos = nodes_.find(head.proc_id());
-			if (pos != nodes_.end()) { // new client
+			auto pos = nodes_.find(ssn);
+			if (pos != nodes_.end()) { // update
 				Node &node = pos->second;
-				if (node->addrs_.find(SrcAddr(head)) == node->addrs_.end()) {
-					// node restarted, release old mq.
-					RemoveNode(node);
-					node.reset(new NodeInfo);
-				}
 				UpdateRegInfo(node);
 			} else {
 				Node node(new NodeInfo);
 				UpdateRegInfo(node);
-				nodes_[node->proc_.proc_id()] = node;
+				nodes_[ssn] = node;
+
+				auto old = node_addr_map_.find(head.proc_id());
+				if (old != node_addr_map_.end()) { // old session
+					auto &old_ssn = old->second;
+					nodes_[old_ssn]->state_.PutOffline(offline_time_);
+					printf("put %s %ld offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second);
+					old_ssn = ssn;
+				} else {
+					node_addr_map_.emplace(head.proc_id(), ssn);
+				}
 			}
 			return MakeReply(eSuccess);
 		} catch (...) {
@@ -140,7 +154,7 @@
 	Reply HandleMsg(const BHMsgHead &head, Func const &op)
 	{
 		try {
-			auto pos = nodes_.find(head.proc_id());
+			auto pos = nodes_.find(head.ssn_id());
 			if (pos == nodes_.end()) {
 				return MakeReply<Reply>(eNotRegistered, "Node is not registered.");
 			} else {
@@ -171,9 +185,7 @@
 		return HandleMsg(
 		    head, [&](Node node) -> MsgCommonReply {
 			    NodeInfo &ni = *node;
-			    auto now = NowSec(); // just set to offline.
-			    ni.state_.timestamp_ = now - offline_time_;
-			    ni.state_.UpdateState(now, offline_time_, kill_time_);
+			    ni.state_.PutOffline(offline_time_);
 			    return MakeReply(eSuccess);
 		    });
 	}
@@ -375,6 +387,7 @@
 		};
 		EraseMapRec(service_map_, node->services_);
 		EraseMapRec(subscribe_map_, node->subscriptions_);
+		node_addr_map_.erase(node->proc_.proc_id());
 
 		for (auto &addr : node->addrs_) {
 			cleaner_(addr);
@@ -385,7 +398,8 @@
 
 	std::unordered_map<Topic, Clients> service_map_;
 	std::unordered_map<Topic, Clients> subscribe_map_;
-	std::unordered_map<ProcId, Node> nodes_;
+	std::unordered_map<Address, Node> nodes_;
+	std::unordered_map<std::string, Address> node_addr_map_;
 	Cleaner cleaner_; // remove mqs.
 	int64_t offline_time_;
 	int64_t kill_time_;
@@ -425,7 +439,7 @@
 	auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 2);
 	auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
 		return [&](auto &&rep_body) {
-			auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
+			auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id()));
 			auto remote = head.route(0).mq_id();
 			socket.Send(remote, reply_head, rep_body);
 		};

--
Gitblit v1.8.0