From 36e6a35a886252516f168b90f7a9a7c1c5177312 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期六, 08 五月 2021 15:57:01 +0800 Subject: [PATCH] center alloc node queue; node just find them. --- box/center.cpp | 31 ++++++++++++++++++++++++++----- 1 files changed, 26 insertions(+), 5 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index c57d34d..d6ac804 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -102,22 +102,43 @@ // center name, no relative to shm. const std::string &id() const { return id_; } - void OnNodeInit(const int64_t msg) + void OnNodeInit(SharedMemory &shm, const int64_t msg) { MQId ssn = msg; + if (nodes_.find(ssn) != nodes_.end()) { + return; // ignore in exists. + } + auto UpdateRegInfo = [&](Node &node) { for (int i = 0; i < 10; ++i) { node->addrs_.insert(ssn + i); } node->state_.timestamp_ = NowSec() - offline_time_; node->state_.UpdateState(NowSec(), offline_time_, kill_time_); + + // create sockets. + try { + auto CreateSocket = [](SharedMemory &shm, const MQId id) { + ShmSocket tmp(shm, true, id, 16); + }; + // alloc(-1), node, server, sub, request, + for (int i = -1; i < 4; ++i) { + CreateSocket(shm, ssn + i); + node->addrs_.insert(ssn + i); + } + return true; + } catch (...) { + return false; + } }; Node node(new NodeInfo); - UpdateRegInfo(node); - nodes_[ssn] = node; - LOG_INFO() << "new node ssn (" << ssn << ") init"; + if (UpdateRegInfo(node)) { + nodes_[ssn] = node; + LOG_INFO() << "new node ssn (" << ssn << ") init"; + } } + MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg) { if (msg.proc().proc_id() != head.proc_id()) { @@ -475,7 +496,7 @@ { auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) { auto ¢er = *center_ptr; - center->OnNodeInit(msg.Offset()); + center->OnNodeInit(socket.shm(), msg.Offset()); }; auto Nothing = [](ShmSocket &socket) {}; -- Gitblit v1.8.0