From 056f71f24cefaf88f2a93714c6678c03ed5f1e0e Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期五, 02 七月 2021 16:54:33 +0800
Subject: [PATCH] fixed to adapt gcc-5.4 & glibc-2.25

---
 box/node_center.cpp |  120 ++++++++++++++++++++++++++++++++++++++++++++----------------
 1 files changed, 88 insertions(+), 32 deletions(-)

diff --git a/box/node_center.cpp b/box/node_center.cpp
index c32b197..76407a8 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -31,6 +31,9 @@
 const std::string kTopicNode = Join(kTopicCenterRoot, "node");
 const std::string kTopicNodeOnline = Join(kTopicNode, "online");
 const std::string kTopicNodeOffline = Join(kTopicNode, "offline");
+const std::string kTopicNodeService = Join(kTopicNode, "service");
+const std::string kTopicNodeSub = Join(kTopicNode, "subscribe");
+const std::string kTopicNodeUnsub = Join(kTopicNode, "unsubscribe");
 } // namespace
 
 ProcIndex ProcRecords::Put(const ProcId &proc_id, const MQId ssn)
@@ -114,33 +117,31 @@
 {
 	state_.timestamp_ = NowSec() - offline_time;
 	state_.flag_ = kStateOffline;
-
-	Json json;
-	json.put("proc_id", proc_.proc_id());
-	center_.Publish(shm_, kTopicNodeOffline, json.dump());
+	center_.Notify(kTopicNodeOffline, *this);
 }
 
+void NodeCenter::Notify(const Topic &topic, NodeInfo &node)
+{
+	if (node.proc_.proc_id().empty()) { return; } // node init, ignore.
+	Json json;
+	json.put("proc_id", node.proc_.proc_id());
+	Publish(node.shm_, topic, json.dump());
+}
 void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
 {
 	auto old = state_.flag_;
 	auto diff = now - state_.timestamp_;
-	auto publish = [this](const std::string &topic) {
-		if (proc_.proc_id().empty()) { return; } // node init, ignore.
-		Json json;
-		json.put("proc_id", proc_.proc_id());
-		center_.Publish(shm_, topic, json.dump());
-	};
 
 	LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff;
 	if (diff < offline_time) {
 		state_.flag_ = kStateNormal;
 		if (old != state_.flag_) {
-			publish(kTopicNodeOnline);
+			center_.Notify(kTopicNodeOnline, *this);
 		}
 	} else if (diff < kill_time) {
 		state_.flag_ = kStateOffline;
 		if (old != state_.flag_) {
-			publish(kTopicNodeOffline);
+			center_.Notify(kTopicNodeOffline, *this);
 		}
 	} else {
 		state_.flag_ = kStateKillme;
@@ -269,6 +270,7 @@
 
 bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content)
 {
+	// LOG_FUNCTION;
 	auto &topic = head.topic();
 	auto clients = DoFindClients(topic, true);
 	if (clients.empty()) { return true; }
@@ -287,9 +289,10 @@
 				}
 			}
 			MsgI msg(shm);
-			if (msg.Make(body_content)) {
+			if (msg.Make(head, body_content)) {
 				RecordMsg(msg);
 				msgs.push_back(msg);
+				// LOG_DEBUG() << "remote publish to local." << cli.mq_id_ << ", " << cli.mq_abs_addr_;
 				DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
 			}
 		};
@@ -476,6 +479,7 @@
 		    for (auto &topic : topics) {
 			    LOG_DEBUG() << "\t" << topic;
 		    }
+		    Notify(kTopicNodeService, *node);
 		    return MakeReply(eSuccess);
 	    });
 }
@@ -552,22 +556,43 @@
 	typedef MsgQueryTopicReply Reply;
 
 	auto query = [&](Node self) -> Reply {
-		auto pos = service_map_.find(req.topic());
-		if (pos != service_map_.end() && !pos->second.empty()) {
-			auto &clients = pos->second;
-			Reply reply = MakeReply<Reply>(eSuccess);
-			for (auto &dest : clients) {
-				Node dest_node(dest.weak_node_.lock());
-				if (dest_node && Valid(*dest_node)) {
-					auto node_addr = reply.add_node_address();
-					node_addr->set_proc_id(dest_node->proc_.proc_id());
-					node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
-					node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
+		Reply reply = MakeReply<Reply>(eSuccess);
+		auto local = [&]() {
+			auto pos = service_map_.find(req.topic());
+			if (pos != service_map_.end() && !pos->second.empty()) {
+				auto &clients = pos->second;
+				for (auto &dest : clients) {
+					Node dest_node(dest.weak_node_.lock());
+					if (dest_node && Valid(*dest_node)) {
+						auto node_addr = reply.add_node_address();
+						node_addr->set_proc_id(dest_node->proc_.proc_id());
+						node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
+						node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
+					}
 				}
+				return true;
+			} else {
+				return false;
 			}
-			return reply;
-		} else {
+		};
+		auto net = [&]() {
+			auto hosts(FindRemoteRPCServers(req.topic()));
+			if (hosts.empty()) {
+				return false;
+			} else {
+				for (auto &ip : hosts) {
+					auto node_addr = reply.add_node_address();
+					node_addr->mutable_addr()->set_ip(ip);
+				}
+				return true;
+			}
+		};
+		local();
+		net();
+		if (reply.node_address_size() == 0) {
 			return MakeReply<Reply>(eNotFound, "topic server not found.");
+		} else {
+			return reply;
 		}
 	};
 
@@ -585,9 +610,9 @@
 			sub_map[topic].insert(dest);
 		}
 	};
-	LOG_DEBUG() << "subscribe net : " << msg.network();
 	if (msg.network()) {
 		Sub(net_sub_, center_.net_sub_map_);
+		center_.Notify(kTopicNodeSub, *this);
 	} else {
 		Sub(local_sub_, center_.local_sub_map_);
 	}
@@ -632,6 +657,7 @@
 	};
 	if (msg.network()) {
 		Unsub(net_sub_, center_.net_sub_map_);
+		center_.Notify(kTopicNodeUnsub, *this);
 	} else {
 		Unsub(local_sub_, center_.local_sub_map_);
 	}
@@ -647,6 +673,7 @@
 
 NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote)
 {
+	// LOG_FUNCTION;
 	Clients dests;
 	auto Find1 = [&](const std::string &exact) {
 		auto FindIn = [&](auto &sub_map) {
@@ -654,16 +681,25 @@
 			if (pos != sub_map.end()) {
 				auto &clients = pos->second;
 				for (auto &cli : clients) {
-					if (Valid(cli.weak_node_)) {
-						dests.insert(cli);
+					auto node = cli.weak_node_.lock();
+					if (node) {
+						if (node->state_.flag_ == kStateNormal)
+							dests.insert(cli);
 					}
+
+					// if (Valid(cli.weak_node_)) {
+					// 	dests.insert(cli);
+					// }
 				}
 			}
 		};
 		if (!from_remote) {
 			FindIn(local_sub_map_);
+			// LOG_DEBUG() << "topic '" << topic << "' local clients: " << dests.size();
 		}
+		// net subscripitions also work in local mode.
 		FindIn(net_sub_map_);
+		// LOG_DEBUG() << "topic '" << topic << "' + remote clients: " << dests.size();
 	};
 	Find1(topic);
 
@@ -789,8 +825,28 @@
 	}
 }
 
-std::vector<std::string> NodeCenter::FindRemoteSubClients(const Topic &topic)
+void NodeCenter::NetRecords::ParseData(const ssjson::Json &info)
 {
-	//TODO search synced full list;
-	return std::vector<std::string>();
+	// LOG_FUNCTION;
+	sub_hosts_.clear();
+	rpc_hosts_.clear();
+	for (auto &host : info.array()) {
+		if (host.get("isLocal", false)) {
+			host_id_ = host.get("serverId", "");
+			ip_ = host.get("ip", "");
+		} else {
+			auto ip = host.get("ip", "");
+			auto UpdateRec = [&](const ssjson::Json::array_type &lot, auto &rec) {
+				for (auto &topic : lot) {
+					auto t = topic.get_value<std::string>();
+					rec[t].insert(ip);
+					// LOG_DEBUG() << "net topic: " << t << ", " << ip;
+				}
+			};
+			// LOG_DEBUG() << "serives:";
+			UpdateRec(host.child("pubTopics").array(), rpc_hosts_);
+			// LOG_DEBUG() << "net sub:";
+			UpdateRec(host.child("netSubTopics").array(), sub_hosts_);
+		}
+	}
 }
\ No newline at end of file

--
Gitblit v1.8.0