From 330f78f3334bcdcdb4cc2ab2dbf66604e0224d71 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 21 五月 2021 16:21:45 +0800
Subject: [PATCH] Merge branch 'master' of http://192.168.5.5:10010/r/valib/bhshmq

---
 box/center.cpp |   32 ++++++++++----------------------
 1 files changed, 10 insertions(+), 22 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index e745be8..53c1f42 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -65,7 +65,7 @@
 	return [&](auto &&rep_body) {
 		auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id()));
 		MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()};
-		MsgI msg;
+		MsgI msg(socket.shm());
 		if (msg.Make(reply_head, rep_body)) {
 			DEFER1(msg.Release(););
 			center->SendAllocMsg(socket, remote, msg);
@@ -73,7 +73,7 @@
 	};
 }
 
-bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr)
+bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm)
 {
 	// command
 	auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool {
@@ -87,7 +87,7 @@
 		auto onInit = [&](const int64_t request) {
 			return center->OnNodeInit(socket, request);
 		};
-		BHCenterHandleInit(onInit);
+		BHCenterHandleInit(socket.shm(), onInit);
 		center->OnTimer();
 	};
 
@@ -106,7 +106,7 @@
 		default: return false;
 		}
 	};
-	BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000);
+	BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000);
 
 	auto OnBusIdle = [=](ShmSocket &socket) {};
 	auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; };
@@ -124,20 +124,14 @@
 			} else {
 				replyer(MakeReply(eSuccess));
 				if (clients.empty()) { return; }
-
-				auto it = clients.begin();
-				do {
-					auto &cli = *it;
+				for (auto &cli : clients) {
 					auto node = cli.weak_node_.lock();
 					if (node) {
 						// should also make sure that mq is not killed before msg expires.
 						// it would be ok if (kill_time - offline_time) is longer than expire time.
 						socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
-						++it;
-					} else {
-						it = clients.erase(it);
 					}
-				} while (it != clients.end());
+				}
 			}
 		};
 		switch (head.type()) {
@@ -148,7 +142,7 @@
 		}
 	};
 
-	BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000);
+	BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000);
 
 	return true;
 }
@@ -171,16 +165,9 @@
 
 BHCenter::BHCenter(Socket::Shm &shm)
 {
-	auto gc = [&](const MQId id) {
-		auto r = ShmSocket::Remove(shm, id);
-		if (r) {
-			LOG_DEBUG() << "remove mq " << id << " ok\n";
-		}
-	};
-
 	auto nsec = NodeTimeoutSec();
-	auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
-	AddCenter(center_ptr);
+	auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
+	AddCenter(center_ptr, shm);
 
 	for (auto &kv : Centers()) {
 		auto &info = kv.second;
@@ -189,6 +176,7 @@
 
 	topic_node_.reset(new CenterTopicNode(center_ptr, shm));
 }
+
 BHCenter::~BHCenter() { Stop(); }
 
 bool BHCenter::Start()

--
Gitblit v1.8.0