From 993c556000a414011626770540678948f16eaa9e Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 02 六月 2021 17:40:50 +0800
Subject: [PATCH] center restart with new shm; set center node ssn.

---
 box/node_center.cpp |   70 ++++++++++++++++++++++++++++++----
 1 files changed, 61 insertions(+), 9 deletions(-)

diff --git a/box/node_center.cpp b/box/node_center.cpp
index cbaef0e..b285c9f 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -70,8 +70,9 @@
 		return;
 	}
 	// LOG_FUNCTION;
+	const size_t total = msgs_.size();
 	time_to_clean_ = now + 1;
-	int64_t limit = std::max(10000ul, msgs_.size() / 10);
+	int64_t limit = std::max(10000ul, total / 10);
 	int64_t n = 0;
 	auto it = msgs_.begin();
 	while (it != msgs_.end() && --limit > 0) {
@@ -82,16 +83,16 @@
 			++n;
 		};
 		int n = now - msg.timestamp();
-		if (n < 10) {
+		if (msg.Count() == 0) {
+			Free();
+		} else if (n > NodeTimeoutSec()) {
+			Free();
+		} else {
 			++it;
-		} else if (msg.Count() == 0) {
-			Free();
-		} else if (n > 60) {
-			Free();
 		}
 	}
 	if (n > 0) {
-		LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n;
+		LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total;
 	}
 }
 
@@ -163,7 +164,7 @@
 
 		// create sockets.
 		try {
-			ShmSocket tmp(shm, true, ssn, 16);
+			ShmSocket tmp(shm, ssn, eCreate);
 			node->addrs_.emplace(ssn, tmp.AbsAddr());
 			return true;
 		} catch (...) {
@@ -208,6 +209,57 @@
 {
 	RecordMsg(msg);
 	return socket.Send(dest, msg);
+}
+
+NodeCenter::Node NodeCenter::GetNode(const MQId mq_id)
+{
+	Node node;
+	auto ssn = mq_id - (mq_id % 10);
+	auto pos = nodes_.find(ssn);
+	if (pos != nodes_.end()) {
+		node = pos->second;
+	}
+	return node;
+}
+
+bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
+{
+	Node node(GetNode(dest.id_));
+	if (!node || !Valid(*node)) {
+		LOG_ERROR() << id() << " pass remote request, dest not found.";
+		return false;
+	}
+
+	ShmSocket &sender(DefaultSender(node->shm_));
+	auto route = head.add_route();
+	route->set_mq_id(sender.id());
+	route->set_abs_addr(sender.AbsAddr());
+
+	ShmMsg msg(node->shm_);
+	if (!msg.Make(head, body_content)) { return false; }
+	DEFER1(msg.Release(););
+	RecordMsg(msg);
+	return sender.Send(dest, msg, head.msg_id(), std::move(cb));
+}
+
+bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content)
+{
+	Node node(GetNode(dest.id_));
+	if (!node) {
+		LOG_ERROR() << id() << " pass remote reply , ssn not found.";
+		return false;
+	}
+	auto offset = node->addrs_[dest.id_];
+	if (offset != dest.offset_) {
+		LOG_ERROR() << id() << " pass remote reply, dest address not match";
+		return false;
+	}
+
+	ShmMsg msg(node->shm_);
+	if (!msg.Make(head, body_content)) { return false; }
+	DEFER1(msg.Release(););
+	RecordMsg(msg);
+	return DefaultSender(node->shm_).Send(dest, msg);
 }
 
 void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val)
@@ -281,7 +333,7 @@
 	auto &node = pos->second;
 	try {
 		for (int i = 0; i < msg.extra_mq_num(); ++i) {
-			ShmSocket tmp(node->shm_, true, head.ssn_id() + i + 1, 16);
+			ShmSocket tmp(node->shm_, head.ssn_id() + i + 1, eCreate);
 			node->addrs_.emplace(tmp.id(), tmp.AbsAddr());
 			auto addr = reply.add_extra_mqs();
 			addr->set_mq_id(tmp.id());

--
Gitblit v1.8.0