From 36e6a35a886252516f168b90f7a9a7c1c5177312 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期六, 08 五月 2021 15:57:01 +0800
Subject: [PATCH] center alloc node queue; node just find them.

---
 box/center.cpp |   31 ++++++++++++++++++++++++++-----
 1 files changed, 26 insertions(+), 5 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index c57d34d..d6ac804 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -102,22 +102,43 @@
 
 	// center name, no relative to shm.
 	const std::string &id() const { return id_; }
-	void OnNodeInit(const int64_t msg)
+	void OnNodeInit(SharedMemory &shm, const int64_t msg)
 	{
 		MQId ssn = msg;
+		if (nodes_.find(ssn) != nodes_.end()) {
+			return; // ignore in exists.
+		}
+
 		auto UpdateRegInfo = [&](Node &node) {
 			for (int i = 0; i < 10; ++i) {
 				node->addrs_.insert(ssn + i);
 			}
 			node->state_.timestamp_ = NowSec() - offline_time_;
 			node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
+
+			// create sockets.
+			try {
+				auto CreateSocket = [](SharedMemory &shm, const MQId id) {
+					ShmSocket tmp(shm, true, id, 16);
+				};
+				// alloc(-1), node, server, sub, request,
+				for (int i = -1; i < 4; ++i) {
+					CreateSocket(shm, ssn + i);
+					node->addrs_.insert(ssn + i);
+				}
+				return true;
+			} catch (...) {
+				return false;
+			}
 		};
 
 		Node node(new NodeInfo);
-		UpdateRegInfo(node);
-		nodes_[ssn] = node;
-		LOG_INFO() << "new node ssn (" << ssn << ") init";
+		if (UpdateRegInfo(node)) {
+			nodes_[ssn] = node;
+			LOG_INFO() << "new node ssn (" << ssn << ") init";
+		}
 	}
+
 	MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
 	{
 		if (msg.proc().proc_id() != head.proc_id()) {
@@ -475,7 +496,7 @@
 {
 	auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) {
 		auto &center = *center_ptr;
-		center->OnNodeInit(msg.Offset());
+		center->OnNodeInit(socket.shm(), msg.Offset());
 	};
 	auto Nothing = [](ShmSocket &socket) {};
 

--
Gitblit v1.8.0