From 6c07fe29a5185835f28059f627a1d30e462da28b Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 29 六月 2021 14:01:19 +0800
Subject: [PATCH] add notify node change.

---
 box/node_center.h   |    1 +
 box/node_center.cpp |   28 ++++++++++++++++------------
 2 files changed, 17 insertions(+), 12 deletions(-)

diff --git a/box/node_center.cpp b/box/node_center.cpp
index c32b197..068aa00 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -31,6 +31,9 @@
 const std::string kTopicNode = Join(kTopicCenterRoot, "node");
 const std::string kTopicNodeOnline = Join(kTopicNode, "online");
 const std::string kTopicNodeOffline = Join(kTopicNode, "offline");
+const std::string kTopicNodeService = Join(kTopicNode, "service");
+const std::string kTopicNodeSub = Join(kTopicNode, "subscribe");
+const std::string kTopicNodeUnsub = Join(kTopicNode, "unsubscribe");
 } // namespace
 
 ProcIndex ProcRecords::Put(const ProcId &proc_id, const MQId ssn)
@@ -114,33 +117,31 @@
 {
 	state_.timestamp_ = NowSec() - offline_time;
 	state_.flag_ = kStateOffline;
-
-	Json json;
-	json.put("proc_id", proc_.proc_id());
-	center_.Publish(shm_, kTopicNodeOffline, json.dump());
+	center_.Notify(kTopicNodeOffline, *this);
 }
 
+void NodeCenter::Notify(const Topic &topic, NodeInfo &node)
+{
+	if (node.proc_.proc_id().empty()) { return; } // node init, ignore.
+	Json json;
+	json.put("proc_id", node.proc_.proc_id());
+	Publish(node.shm_, topic, json.dump());
+}
 void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
 {
 	auto old = state_.flag_;
 	auto diff = now - state_.timestamp_;
-	auto publish = [this](const std::string &topic) {
-		if (proc_.proc_id().empty()) { return; } // node init, ignore.
-		Json json;
-		json.put("proc_id", proc_.proc_id());
-		center_.Publish(shm_, topic, json.dump());
-	};
 
 	LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff;
 	if (diff < offline_time) {
 		state_.flag_ = kStateNormal;
 		if (old != state_.flag_) {
-			publish(kTopicNodeOnline);
+			center_.Notify(kTopicNodeOnline, *this);
 		}
 	} else if (diff < kill_time) {
 		state_.flag_ = kStateOffline;
 		if (old != state_.flag_) {
-			publish(kTopicNodeOffline);
+			center_.Notify(kTopicNodeOffline, *this);
 		}
 	} else {
 		state_.flag_ = kStateKillme;
@@ -476,6 +477,7 @@
 		    for (auto &topic : topics) {
 			    LOG_DEBUG() << "\t" << topic;
 		    }
+		    Notify(kTopicNodeService, *node);
 		    return MakeReply(eSuccess);
 	    });
 }
@@ -588,6 +590,7 @@
 	LOG_DEBUG() << "subscribe net : " << msg.network();
 	if (msg.network()) {
 		Sub(net_sub_, center_.net_sub_map_);
+		center_.Notify(kTopicNodeSub, *this);
 	} else {
 		Sub(local_sub_, center_.local_sub_map_);
 	}
@@ -632,6 +635,7 @@
 	};
 	if (msg.network()) {
 		Unsub(net_sub_, center_.net_sub_map_);
+		center_.Notify(kTopicNodeUnsub, *this);
 	} else {
 		Unsub(local_sub_, center_.local_sub_map_);
 	}
diff --git a/box/node_center.h b/box/node_center.h
index 54f84c0..ae5b075 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -194,6 +194,7 @@
 	void CheckNodes();
 	bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; }
 	void Publish(SharedMemory &shm, const Topic &topic, const std::string &content);
+	void Notify(const Topic &topic, NodeInfo &node);
 	void DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg);
 	Clients DoFindClients(const std::string &topic, bool from_remote);
 	bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; }

--
Gitblit v1.8.0