From 377e395a5fdc6ad44bdd5a2d41d2930f45fc4384 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 30 四月 2021 18:25:33 +0800 Subject: [PATCH] add node init msg, alloc msgq on success. --- box/center.cpp | 75 ++++++++++++++++++++++++++++--------- 1 files changed, 56 insertions(+), 19 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index 0952ca7..aa6f285 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -103,7 +103,22 @@ // center name, no relative to shm. const std::string &id() const { return id_; } + void OnNodeInit(const int64_t msg) + { + MQId ssn = msg; + auto UpdateRegInfo = [&](Node &node) { + for (int i = 0; i < 10; ++i) { + node->addrs_.insert(ssn + i); + } + node->state_.timestamp_ = NowSec() - offline_time_; + node->state_.UpdateState(NowSec(), offline_time_, kill_time_); + }; + Node node(new NodeInfo); + UpdateRegInfo(node); + nodes_[ssn] = node; + printf("new node ssn (%ld) init\n", ssn); + } MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg) { if (msg.proc().proc_id() != head.proc_id()) { @@ -132,17 +147,19 @@ Node node(new NodeInfo); UpdateRegInfo(node); nodes_[ssn] = node; + } + printf("node (%s) ssn (%ld)\n", head.proc_id().c_str(), ssn); - printf("new node (%s) ssn (%ld)\n", head.proc_id().c_str(), ssn); - auto old = online_node_addr_map_.find(head.proc_id()); - if (old != online_node_addr_map_.end()) { // old session - auto &old_ssn = old->second; + auto old = online_node_addr_map_.find(head.proc_id()); + if (old != online_node_addr_map_.end()) { // old session + auto &old_ssn = old->second; + if (old_ssn != ssn) { nodes_[old_ssn]->state_.PutOffline(offline_time_); printf("put node (%s) ssn (%ld) offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second); old_ssn = ssn; - } else { - online_node_addr_map_.emplace(head.proc_id(), ssn); } + } else { + online_node_addr_map_.emplace(head.proc_id(), ssn); } return MakeReply(eSuccess); } catch (...) { @@ -446,16 +463,24 @@ msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \ return true; -bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner) +auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { - auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 2); - auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { - return [&](auto &&rep_body) { - auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id())); - auto remote = head.route(0).mq_id(); - socket.Send(remote, reply_head, rep_body); - }; + return [&](auto &&rep_body) { + auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id())); + auto remote = head.route(0).mq_id(); + socket.Send(remote, reply_head, rep_body); }; +} + +bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr) +{ + auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) { + auto ¢er = *center_ptr; + center->OnNodeInit(msg.Offset()); + }; + auto Nothing = [](ShmSocket &socket) {}; + + BHCenter::Install("#centetr.Init", OnNodeInit, Nothing, BHInitAddress(), 16); auto OnCenterIdle = [center_ptr](ShmSocket &socket) { auto ¢er = *center_ptr; @@ -475,6 +500,7 @@ default: return false; } }; + BHCenter::Install("#center.main", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000); auto OnBusIdle = [=](ShmSocket &socket) {}; auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { @@ -515,7 +541,6 @@ } }; - BHCenter::Install("#center.reg", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000); BHCenter::Install("#center.bus", OnPubSub, OnBusIdle, BHTopicBusAddress(), 1000); return true; @@ -533,7 +558,12 @@ bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len) { - Centers()[name] = CenterInfo{name, handler, idle, mqid, mq_len}; + Centers()[name] = CenterInfo{name, handler, MsgIHandler(), idle, mqid, mq_len}; + return true; +} +bool BHCenter::Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len) +{ + Centers()[name] = CenterInfo{name, MsgHandler(), handler, idle, mqid, mq_len}; return true; } @@ -541,10 +571,13 @@ { auto gc = [&](const MQId id) { auto r = ShmSocket::Remove(shm, id); - printf("remove mq %ld : %s\n", id, (r ? "ok" : "failed")); + if (r) { + printf("remove mq %ld ok\n", id); + } }; - AddCenter("#bhome_center", gc); + auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, 6s, 6s * 2); + AddCenter(center_ptr); for (auto &kv : Centers()) { auto &info = kv.second; @@ -556,7 +589,11 @@ { for (auto &kv : Centers()) { auto &info = kv.second; - sockets_[info.name_]->Start(info.handler_, info.idle_); + if (info.handler_) { + sockets_[info.name_]->Start(info.handler_, info.idle_); + } else { + sockets_[info.name_]->Start(info.raw_handler_, info.idle_); + } } return true; -- Gitblit v1.8.0