From 5b6ced44157b6e7fab519ae48f5cffcdc2b3cd7c Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期日, 25 四月 2021 19:28:57 +0800 Subject: [PATCH] use node mqid ssn id to index online nodes. --- box/center.cpp | 42 ++++++++++++++++++++++++++++-------------- 1 files changed, 28 insertions(+), 14 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index d920ff7..4bb9ea1 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -52,6 +52,11 @@ struct ProcState { int64_t timestamp_ = 0; uint32_t flag_ = 0; // reserved + void PutOffline(const int64_t offline_time) + { + timestamp_ = NowSec() - offline_time; + flag_ = kStateOffline; + } void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) { auto diff = now - timestamp_; @@ -106,6 +111,10 @@ } try { + MQId ssn = head.ssn_id(); + // use src_addr as session id. + // when node restart, src_addr will change, + // and old node will be removed after timeout. auto UpdateRegInfo = [&](Node &node) { node->addrs_.insert(SrcAddr(head)); for (auto &addr : msg.addrs()) { @@ -116,19 +125,24 @@ node->state_.UpdateState(NowSec(), offline_time_, kill_time_); }; - auto pos = nodes_.find(head.proc_id()); - if (pos != nodes_.end()) { // new client + auto pos = nodes_.find(ssn); + if (pos != nodes_.end()) { // update Node &node = pos->second; - if (node->addrs_.find(SrcAddr(head)) == node->addrs_.end()) { - // node restarted, release old mq. - RemoveNode(node); - node.reset(new NodeInfo); - } UpdateRegInfo(node); } else { Node node(new NodeInfo); UpdateRegInfo(node); - nodes_[node->proc_.proc_id()] = node; + nodes_[ssn] = node; + + auto old = node_addr_map_.find(head.proc_id()); + if (old != node_addr_map_.end()) { // old session + auto &old_ssn = old->second; + nodes_[old_ssn]->state_.PutOffline(offline_time_); + printf("put %s %ld offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second); + old_ssn = ssn; + } else { + node_addr_map_.emplace(head.proc_id(), ssn); + } } return MakeReply(eSuccess); } catch (...) { @@ -140,7 +154,7 @@ Reply HandleMsg(const BHMsgHead &head, Func const &op) { try { - auto pos = nodes_.find(head.proc_id()); + auto pos = nodes_.find(head.ssn_id()); if (pos == nodes_.end()) { return MakeReply<Reply>(eNotRegistered, "Node is not registered."); } else { @@ -171,9 +185,7 @@ return HandleMsg( head, [&](Node node) -> MsgCommonReply { NodeInfo &ni = *node; - auto now = NowSec(); // just set to offline. - ni.state_.timestamp_ = now - offline_time_; - ni.state_.UpdateState(now, offline_time_, kill_time_); + ni.state_.PutOffline(offline_time_); return MakeReply(eSuccess); }); } @@ -375,6 +387,7 @@ }; EraseMapRec(service_map_, node->services_); EraseMapRec(subscribe_map_, node->subscriptions_); + node_addr_map_.erase(node->proc_.proc_id()); for (auto &addr : node->addrs_) { cleaner_(addr); @@ -385,7 +398,8 @@ std::unordered_map<Topic, Clients> service_map_; std::unordered_map<Topic, Clients> subscribe_map_; - std::unordered_map<ProcId, Node> nodes_; + std::unordered_map<Address, Node> nodes_; + std::unordered_map<std::string, Address> node_addr_map_; Cleaner cleaner_; // remove mqs. int64_t offline_time_; int64_t kill_time_; @@ -425,7 +439,7 @@ auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 2); auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { return [&](auto &&rep_body) { - auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); + auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id())); auto remote = head.route(0).mq_id(); socket.Send(remote, reply_head, rep_body); }; -- Gitblit v1.8.0