From 6c07fe29a5185835f28059f627a1d30e462da28b Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 29 六月 2021 14:01:19 +0800 Subject: [PATCH] add notify node change. --- box/node_center.h | 1 + box/node_center.cpp | 28 ++++++++++++++++------------ 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/box/node_center.cpp b/box/node_center.cpp index c32b197..068aa00 100644 --- a/box/node_center.cpp +++ b/box/node_center.cpp @@ -31,6 +31,9 @@ const std::string kTopicNode = Join(kTopicCenterRoot, "node"); const std::string kTopicNodeOnline = Join(kTopicNode, "online"); const std::string kTopicNodeOffline = Join(kTopicNode, "offline"); +const std::string kTopicNodeService = Join(kTopicNode, "service"); +const std::string kTopicNodeSub = Join(kTopicNode, "subscribe"); +const std::string kTopicNodeUnsub = Join(kTopicNode, "unsubscribe"); } // namespace ProcIndex ProcRecords::Put(const ProcId &proc_id, const MQId ssn) @@ -114,33 +117,31 @@ { state_.timestamp_ = NowSec() - offline_time; state_.flag_ = kStateOffline; - - Json json; - json.put("proc_id", proc_.proc_id()); - center_.Publish(shm_, kTopicNodeOffline, json.dump()); + center_.Notify(kTopicNodeOffline, *this); } +void NodeCenter::Notify(const Topic &topic, NodeInfo &node) +{ + if (node.proc_.proc_id().empty()) { return; } // node init, ignore. + Json json; + json.put("proc_id", node.proc_.proc_id()); + Publish(node.shm_, topic, json.dump()); +} void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) { auto old = state_.flag_; auto diff = now - state_.timestamp_; - auto publish = [this](const std::string &topic) { - if (proc_.proc_id().empty()) { return; } // node init, ignore. - Json json; - json.put("proc_id", proc_.proc_id()); - center_.Publish(shm_, topic, json.dump()); - }; LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff; if (diff < offline_time) { state_.flag_ = kStateNormal; if (old != state_.flag_) { - publish(kTopicNodeOnline); + center_.Notify(kTopicNodeOnline, *this); } } else if (diff < kill_time) { state_.flag_ = kStateOffline; if (old != state_.flag_) { - publish(kTopicNodeOffline); + center_.Notify(kTopicNodeOffline, *this); } } else { state_.flag_ = kStateKillme; @@ -476,6 +477,7 @@ for (auto &topic : topics) { LOG_DEBUG() << "\t" << topic; } + Notify(kTopicNodeService, *node); return MakeReply(eSuccess); }); } @@ -588,6 +590,7 @@ LOG_DEBUG() << "subscribe net : " << msg.network(); if (msg.network()) { Sub(net_sub_, center_.net_sub_map_); + center_.Notify(kTopicNodeSub, *this); } else { Sub(local_sub_, center_.local_sub_map_); } @@ -632,6 +635,7 @@ }; if (msg.network()) { Unsub(net_sub_, center_.net_sub_map_); + center_.Notify(kTopicNodeUnsub, *this); } else { Unsub(local_sub_, center_.local_sub_map_); } diff --git a/box/node_center.h b/box/node_center.h index 54f84c0..ae5b075 100644 --- a/box/node_center.h +++ b/box/node_center.h @@ -194,6 +194,7 @@ void CheckNodes(); bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; } void Publish(SharedMemory &shm, const Topic &topic, const std::string &content); + void Notify(const Topic &topic, NodeInfo &node); void DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg); Clients DoFindClients(const std::string &topic, bool from_remote); bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; } -- Gitblit v1.8.0