From 6c07fe29a5185835f28059f627a1d30e462da28b Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 29 六月 2021 14:01:19 +0800
Subject: [PATCH] add notify node change.
---
box/node_center.h | 1 +
box/node_center.cpp | 28 ++++++++++++++++------------
2 files changed, 17 insertions(+), 12 deletions(-)
diff --git a/box/node_center.cpp b/box/node_center.cpp
index c32b197..068aa00 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -31,6 +31,9 @@
const std::string kTopicNode = Join(kTopicCenterRoot, "node");
const std::string kTopicNodeOnline = Join(kTopicNode, "online");
const std::string kTopicNodeOffline = Join(kTopicNode, "offline");
+const std::string kTopicNodeService = Join(kTopicNode, "service");
+const std::string kTopicNodeSub = Join(kTopicNode, "subscribe");
+const std::string kTopicNodeUnsub = Join(kTopicNode, "unsubscribe");
} // namespace
ProcIndex ProcRecords::Put(const ProcId &proc_id, const MQId ssn)
@@ -114,33 +117,31 @@
{
state_.timestamp_ = NowSec() - offline_time;
state_.flag_ = kStateOffline;
-
- Json json;
- json.put("proc_id", proc_.proc_id());
- center_.Publish(shm_, kTopicNodeOffline, json.dump());
+ center_.Notify(kTopicNodeOffline, *this);
}
+void NodeCenter::Notify(const Topic &topic, NodeInfo &node)
+{
+ if (node.proc_.proc_id().empty()) { return; } // node init, ignore.
+ Json json;
+ json.put("proc_id", node.proc_.proc_id());
+ Publish(node.shm_, topic, json.dump());
+}
void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
{
auto old = state_.flag_;
auto diff = now - state_.timestamp_;
- auto publish = [this](const std::string &topic) {
- if (proc_.proc_id().empty()) { return; } // node init, ignore.
- Json json;
- json.put("proc_id", proc_.proc_id());
- center_.Publish(shm_, topic, json.dump());
- };
LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff;
if (diff < offline_time) {
state_.flag_ = kStateNormal;
if (old != state_.flag_) {
- publish(kTopicNodeOnline);
+ center_.Notify(kTopicNodeOnline, *this);
}
} else if (diff < kill_time) {
state_.flag_ = kStateOffline;
if (old != state_.flag_) {
- publish(kTopicNodeOffline);
+ center_.Notify(kTopicNodeOffline, *this);
}
} else {
state_.flag_ = kStateKillme;
@@ -476,6 +477,7 @@
for (auto &topic : topics) {
LOG_DEBUG() << "\t" << topic;
}
+ Notify(kTopicNodeService, *node);
return MakeReply(eSuccess);
});
}
@@ -588,6 +590,7 @@
LOG_DEBUG() << "subscribe net : " << msg.network();
if (msg.network()) {
Sub(net_sub_, center_.net_sub_map_);
+ center_.Notify(kTopicNodeSub, *this);
} else {
Sub(local_sub_, center_.local_sub_map_);
}
@@ -632,6 +635,7 @@
};
if (msg.network()) {
Unsub(net_sub_, center_.net_sub_map_);
+ center_.Notify(kTopicNodeUnsub, *this);
} else {
Unsub(local_sub_, center_.local_sub_map_);
}
diff --git a/box/node_center.h b/box/node_center.h
index 54f84c0..ae5b075 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -194,6 +194,7 @@
void CheckNodes();
bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; }
void Publish(SharedMemory &shm, const Topic &topic, const std::string &content);
+ void Notify(const Topic &topic, NodeInfo &node);
void DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg);
Clients DoFindClients(const std::string &topic, bool from_remote);
bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; }
--
Gitblit v1.8.0