From b2484c8bd77a9d21bcf1827f554444535196953d Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 21 五月 2021 10:47:18 +0800
Subject: [PATCH] center save shm on each node, no bind to shm.

---
 box/node_center.cpp |   18 ++++++++++--------
 1 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/box/node_center.cpp b/box/node_center.cpp
index dbf6ee8..4e228a7 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -116,7 +116,7 @@
 
 	Json json;
 	json.put("proc_id", proc_.proc_id());
-	center_.Publish(kTopicNodeOffline, json.dump());
+	center_.Publish(shm_, kTopicNodeOffline, json.dump());
 }
 
 void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
@@ -127,7 +127,7 @@
 		if (proc_.proc_id().empty()) { return; } // node init, ignore.
 		Json json;
 		json.put("proc_id", proc_.proc_id());
-		center_.Publish(topic, json.dump());
+		center_.Publish(shm_, topic, json.dump());
 	};
 
 	LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff;
@@ -182,7 +182,7 @@
 		       SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg);
 	};
 
-	Node node(new NodeInfo(*this));
+	Node node(new NodeInfo(*this, shm));
 	if (UpdateRegInfo(node) && PrepareProcInit(node)) {
 		reply |= (node->addrs_[ssn] << 4);
 		nodes_[ssn] = node;
@@ -281,7 +281,7 @@
 	auto &node = pos->second;
 	try {
 		for (int i = 0; i < msg.extra_mq_num(); ++i) {
-			ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16);
+			ShmSocket tmp(node->shm_, true, head.ssn_id() + i + 1, 16);
 			node->addrs_.emplace(tmp.id(), tmp.AbsAddr());
 			auto addr = reply.add_extra_mqs();
 			addr->set_mq_id(tmp.id());
@@ -593,13 +593,15 @@
 	}
 
 	for (auto &addr : node->addrs_) {
-		cleaner_(addr.first);
+		auto &id = addr.first;
+		auto r = ShmSocket::Remove(node->shm_, id);
+		LOG_DEBUG() << "remove mq " << id << (r ? " ok" : " failed");
 	}
 
 	node->addrs_.clear();
 }
 
-void NodeCenter::Publish(const Topic &topic, const std::string &content)
+void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content)
 {
 	try {
 		// LOG_DEBUG() << "center publish: " << topic << ": " << content;
@@ -615,8 +617,8 @@
 			DEFER1(msg.Release());
 			RecordMsg(msg);
 
-			auto &mq = GetCenterInfo(BHomeShm())->mq_sender_;
-			ShmSocket sender(mq.offset_, BHomeShm(), mq.id_);
+			auto &mq = GetCenterInfo(shm)->mq_sender_;
+			ShmSocket sender(mq.offset_, shm, mq.id_);
 
 			for (auto &cli : clients) {
 				auto node = cli.weak_node_.lock();

--
Gitblit v1.8.0