From 5b6ced44157b6e7fab519ae48f5cffcdc2b3cd7c Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期日, 25 四月 2021 19:28:57 +0800
Subject: [PATCH] use node mqid ssn id to index online nodes.

---
 utest/speed_test.cpp         |    6 +-
 src/proto.cpp                |    7 ++-
 box/center.cpp               |   42 ++++++++++++++-------
 src/proto.h                  |    4 +-
 proto/source/bhome_msg.proto |    5 +-
 src/topic_node.h             |    1 
 src/topic_node.cpp           |   22 +++++-----
 7 files changed, 52 insertions(+), 35 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index d920ff7..4bb9ea1 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -52,6 +52,11 @@
 	struct ProcState {
 		int64_t timestamp_ = 0;
 		uint32_t flag_ = 0; // reserved
+		void PutOffline(const int64_t offline_time)
+		{
+			timestamp_ = NowSec() - offline_time;
+			flag_ = kStateOffline;
+		}
 		void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
 		{
 			auto diff = now - timestamp_;
@@ -106,6 +111,10 @@
 		}
 
 		try {
+			MQId ssn = head.ssn_id();
+			// use src_addr as session id.
+			// when node restart, src_addr will change,
+			// and old node will be removed after timeout.
 			auto UpdateRegInfo = [&](Node &node) {
 				node->addrs_.insert(SrcAddr(head));
 				for (auto &addr : msg.addrs()) {
@@ -116,19 +125,24 @@
 				node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
 			};
 
-			auto pos = nodes_.find(head.proc_id());
-			if (pos != nodes_.end()) { // new client
+			auto pos = nodes_.find(ssn);
+			if (pos != nodes_.end()) { // update
 				Node &node = pos->second;
-				if (node->addrs_.find(SrcAddr(head)) == node->addrs_.end()) {
-					// node restarted, release old mq.
-					RemoveNode(node);
-					node.reset(new NodeInfo);
-				}
 				UpdateRegInfo(node);
 			} else {
 				Node node(new NodeInfo);
 				UpdateRegInfo(node);
-				nodes_[node->proc_.proc_id()] = node;
+				nodes_[ssn] = node;
+
+				auto old = node_addr_map_.find(head.proc_id());
+				if (old != node_addr_map_.end()) { // old session
+					auto &old_ssn = old->second;
+					nodes_[old_ssn]->state_.PutOffline(offline_time_);
+					printf("put %s %ld offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second);
+					old_ssn = ssn;
+				} else {
+					node_addr_map_.emplace(head.proc_id(), ssn);
+				}
 			}
 			return MakeReply(eSuccess);
 		} catch (...) {
@@ -140,7 +154,7 @@
 	Reply HandleMsg(const BHMsgHead &head, Func const &op)
 	{
 		try {
-			auto pos = nodes_.find(head.proc_id());
+			auto pos = nodes_.find(head.ssn_id());
 			if (pos == nodes_.end()) {
 				return MakeReply<Reply>(eNotRegistered, "Node is not registered.");
 			} else {
@@ -171,9 +185,7 @@
 		return HandleMsg(
 		    head, [&](Node node) -> MsgCommonReply {
 			    NodeInfo &ni = *node;
-			    auto now = NowSec(); // just set to offline.
-			    ni.state_.timestamp_ = now - offline_time_;
-			    ni.state_.UpdateState(now, offline_time_, kill_time_);
+			    ni.state_.PutOffline(offline_time_);
 			    return MakeReply(eSuccess);
 		    });
 	}
@@ -375,6 +387,7 @@
 		};
 		EraseMapRec(service_map_, node->services_);
 		EraseMapRec(subscribe_map_, node->subscriptions_);
+		node_addr_map_.erase(node->proc_.proc_id());
 
 		for (auto &addr : node->addrs_) {
 			cleaner_(addr);
@@ -385,7 +398,8 @@
 
 	std::unordered_map<Topic, Clients> service_map_;
 	std::unordered_map<Topic, Clients> subscribe_map_;
-	std::unordered_map<ProcId, Node> nodes_;
+	std::unordered_map<Address, Node> nodes_;
+	std::unordered_map<std::string, Address> node_addr_map_;
 	Cleaner cleaner_; // remove mqs.
 	int64_t offline_time_;
 	int64_t kill_time_;
@@ -425,7 +439,7 @@
 	auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 2);
 	auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
 		return [&](auto &&rep_body) {
-			auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
+			auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id()));
 			auto remote = head.route(0).mq_id();
 			socket.Send(remote, reply_head, rep_body);
 		};
diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index 6a4942d..51e9b6e 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -16,8 +16,9 @@
 	repeated BHAddress route = 2; // for reply and proxy.
 	int64 timestamp = 3;
 	int32 type = 4;
-	bytes proc_id = 5;
-	bytes topic = 6; // for request route
+	uint64 ssn_id = 5; // node mq id
+	bytes proc_id = 6;
+	bytes topic = 7; // for request route
 }
 
 
diff --git a/src/proto.cpp b/src/proto.cpp
index b1e8207..c8a5052 100644
--- a/src/proto.cpp
+++ b/src/proto.cpp
@@ -32,17 +32,18 @@
 
 std::string NewMsgId() { return RandId(); }
 
-BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id)
+BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const uint64_t ssn_id)
 {
-	return InitMsgHead(type, proc_id, RandId());
+	return InitMsgHead(type, proc_id, ssn_id, RandId());
 }
 
-BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid)
+BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const uint64_t ssn_id, const std::string &msgid)
 {
 	BHMsgHead msg;
 	msg.set_msg_id(msgid);
 	msg.set_type(type);
 	msg.set_proc_id(proc_id);
+	msg.set_ssn_id(ssn_id);
 	msg.set_timestamp(NowSec());
 	return msg;
 }
diff --git a/src/proto.h b/src/proto.h
index c30b4fd..94a438c 100644
--- a/src/proto.h
+++ b/src/proto.h
@@ -74,8 +74,8 @@
 	return msg;
 }
 std::string NewMsgId();
-BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid);
-BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id);
+BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const uint64_t ssn_id, const std::string &msgid);
+BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const uint64_t ssn_id);
 // inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
 inline bool IsSuccess(const ErrorCode ec) { return ec == eSuccess; }
 bool IsMsgExpired(const BHMsgHead &head);
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 9398318..13bb8ee 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -89,7 +89,7 @@
 	AddId(SockSub().id());
 	AddId(SockPub().id());
 
-	auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
+	auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn()));
 	AddRoute(head, sock.id());
 
 	auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) {
@@ -129,7 +129,7 @@
 	MsgUnregister body;
 	body.mutable_proc()->Swap(&proc);
 
-	auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
+	auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn()));
 	AddRoute(head, sock.id());
 
 	auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) {
@@ -165,7 +165,7 @@
 	MsgHeartbeat body;
 	body.mutable_proc()->Swap(&proc);
 
-	auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
+	auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn()));
 	AddRoute(head, sock.id());
 
 	if (timeout_ms == 0) {
@@ -195,7 +195,7 @@
 	}
 	auto &sock = SockNode();
 
-	BHMsgHead head(InitMsgHead(GetType(query), proc_id()));
+	BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn()));
 	AddRoute(head, sock.id());
 
 	MsgI reply;
@@ -217,7 +217,7 @@
 	MsgRegisterRPC body;
 	body.mutable_topics()->Swap(&topics);
 
-	auto head(InitMsgHead(GetType(body), proc_id()));
+	auto head(InitMsgHead(GetType(body), proc_id(), ssn()));
 	AddRoute(head, sock.id());
 
 	if (timeout_ms == 0) {
@@ -242,7 +242,7 @@
 
 		MsgRequestTopicReply reply_body;
 		if (rcb(head.proc_id(), req, reply_body)) {
-			BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id()));
+			BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), ssn(), head.msg_id()));
 
 			for (int i = 0; i < head.route_size() - 1; ++i) {
 				reply_head.add_route()->Swap(head.mutable_route(i));
@@ -311,7 +311,7 @@
 	if (!p || p->route.empty()) {
 		return false;
 	}
-	BHMsgHead head(InitMsgHead(GetType(body), proc_id(), p->msg_id));
+	BHMsgHead head(InitMsgHead(GetType(body), proc_id(), ssn(), p->msg_id));
 	for (unsigned i = 0; i < p->route.size() - 1; ++i) {
 		head.add_route()->Swap(&p->route[i]);
 	}
@@ -348,7 +348,7 @@
 
 	auto SendTo = [this, msg_id](const BHAddress &addr, const MsgRequestTopic &req, const RequestResultCB &cb) {
 		auto &sock = SockClient();
-		BHMsgHead head(InitMsgHead(GetType(req), proc_id(), msg_id));
+		BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id));
 		AddRoute(head, sock.id());
 		head.set_topic(req.topic());
 
@@ -388,7 +388,7 @@
 
 		BHAddress addr;
 		if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
-			BHMsgHead head(InitMsgHead(GetType(request), proc_id()));
+			BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
 			AddRoute(head, sock.id());
 			head.set_topic(request.topic());
 
@@ -460,7 +460,7 @@
 
 	try {
 		auto &sock = SockPub();
-		BHMsgHead head(InitMsgHead(GetType(pub), proc_id()));
+		BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn()));
 		AddRoute(head, sock.id());
 
 		if (timeout_ms == 0) {
@@ -494,7 +494,7 @@
 		MsgSubscribe sub;
 		sub.mutable_topics()->Swap(&topics);
 
-		BHMsgHead head(InitMsgHead(GetType(sub), proc_id()));
+		BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn()));
 		AddRoute(head, sock.id());
 		if (timeout_ms == 0) {
 			return sock.Send(BHTopicBusAddress(), head, sub);
diff --git a/src/topic_node.h b/src/topic_node.h
index 76bd608..3c90e5b 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -74,6 +74,7 @@
 	void Stop();
 
 private:
+	MQId ssn() { return SockNode().id(); }
 	bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms);
 	typedef MsgQueryTopicReply::BHNodeAddress NodeAddress;
 	int QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms);
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index d145ab4..302d4bd 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -38,7 +38,7 @@
 		MsgRequestTopic body;
 		body.set_topic("topic");
 		body.set_data(str);
-		auto head(InitMsgHead(GetType(body), proc_id));
+		auto head(InitMsgHead(GetType(body), proc_id, mq.Id()));
 		msg.Make(head, body);
 		assert(msg.valid());
 		DEFER1(msg.Release(););
@@ -156,7 +156,7 @@
 				MsgRequestTopic req_body;
 				req_body.set_topic("topic");
 				req_body.set_data(msg_content);
-				auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
+				auto req_head(InitMsgHead(GetType(req_body), client_proc_id, cli.id()));
 				req_head.add_route()->set_mq_id(cli.id());
 				return cli.Send(srv.id(), req_head, req_body);
 			};
@@ -180,7 +180,7 @@
 						MsgRequestTopic reply_body;
 						reply_body.set_topic("topic");
 						reply_body.set_data(msg_content);
-						auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id()));
+						auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id()));
 						return srv.Send(src_id, reply_head, reply_body);
 					};
 					Reply();

--
Gitblit v1.8.0