From 58d904a328c0d849769b483e901a0be9426b8209 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期二, 20 七月 2021 20:20:44 +0800
Subject: [PATCH] 调整Request C.BHFree的位置

---
 box/node_center.cpp |  385 +++++++++++++++++++++++++++++++++++++++++++-----------
 1 files changed, 303 insertions(+), 82 deletions(-)

diff --git a/box/node_center.cpp b/box/node_center.cpp
index cbaef0e..76407a8 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -31,6 +31,9 @@
 const std::string kTopicNode = Join(kTopicCenterRoot, "node");
 const std::string kTopicNodeOnline = Join(kTopicNode, "online");
 const std::string kTopicNodeOffline = Join(kTopicNode, "offline");
+const std::string kTopicNodeService = Join(kTopicNode, "service");
+const std::string kTopicNodeSub = Join(kTopicNode, "subscribe");
+const std::string kTopicNodeUnsub = Join(kTopicNode, "unsubscribe");
 } // namespace
 
 ProcIndex ProcRecords::Put(const ProcId &proc_id, const MQId ssn)
@@ -70,8 +73,9 @@
 		return;
 	}
 	// LOG_FUNCTION;
+	const size_t total = msgs_.size();
 	time_to_clean_ = now + 1;
-	int64_t limit = std::max(10000ul, msgs_.size() / 10);
+	int64_t limit = std::max(10000ul, total / 10);
 	int64_t n = 0;
 	auto it = msgs_.begin();
 	while (it != msgs_.end() && --limit > 0) {
@@ -82,16 +86,16 @@
 			++n;
 		};
 		int n = now - msg.timestamp();
-		if (n < 10) {
+		if (msg.Count() == 0) {
+			Free();
+		} else if (n > NodeTimeoutSec()) {
+			Free();
+		} else {
 			++it;
-		} else if (msg.Count() == 0) {
-			Free();
-		} else if (n > 60) {
-			Free();
 		}
 	}
 	if (n > 0) {
-		LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n;
+		LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total;
 	}
 }
 
@@ -113,33 +117,31 @@
 {
 	state_.timestamp_ = NowSec() - offline_time;
 	state_.flag_ = kStateOffline;
-
-	Json json;
-	json.put("proc_id", proc_.proc_id());
-	center_.Publish(shm_, kTopicNodeOffline, json.dump());
+	center_.Notify(kTopicNodeOffline, *this);
 }
 
+void NodeCenter::Notify(const Topic &topic, NodeInfo &node)
+{
+	if (node.proc_.proc_id().empty()) { return; } // node init, ignore.
+	Json json;
+	json.put("proc_id", node.proc_.proc_id());
+	Publish(node.shm_, topic, json.dump());
+}
 void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
 {
 	auto old = state_.flag_;
 	auto diff = now - state_.timestamp_;
-	auto publish = [this](const std::string &topic) {
-		if (proc_.proc_id().empty()) { return; } // node init, ignore.
-		Json json;
-		json.put("proc_id", proc_.proc_id());
-		center_.Publish(shm_, topic, json.dump());
-	};
 
 	LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff;
 	if (diff < offline_time) {
 		state_.flag_ = kStateNormal;
 		if (old != state_.flag_) {
-			publish(kTopicNodeOnline);
+			center_.Notify(kTopicNodeOnline, *this);
 		}
 	} else if (diff < kill_time) {
 		state_.flag_ = kStateOffline;
 		if (old != state_.flag_) {
-			publish(kTopicNodeOffline);
+			center_.Notify(kTopicNodeOffline, *this);
 		}
 	} else {
 		state_.flag_ = kStateKillme;
@@ -163,7 +165,7 @@
 
 		// create sockets.
 		try {
-			ShmSocket tmp(shm, true, ssn, 16);
+			ShmSocket tmp(shm, ssn, eCreate);
 			node->addrs_.emplace(ssn, tmp.AbsAddr());
 			return true;
 		} catch (...) {
@@ -208,6 +210,121 @@
 {
 	RecordMsg(msg);
 	return socket.Send(dest, msg);
+}
+
+NodeCenter::Node NodeCenter::GetNode(const MQId mq_id)
+{
+	Node node;
+	auto ssn = mq_id - (mq_id % 10);
+	auto pos = nodes_.find(ssn);
+	if (pos != nodes_.end()) {
+		node = pos->second;
+	}
+	return node;
+}
+
+bool NodeCenter::PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
+{
+	Node node;
+
+	auto FindDest = [&]() {
+		auto pos = service_map_.find(head.topic());
+		if (pos != service_map_.end() && !pos->second.empty()) {
+			auto &clients = pos->second;
+			for (auto &cli : clients) {
+				node = cli.weak_node_.lock();
+				if (node && Valid(*node)) {
+					dest.id_ = cli.mq_id_;
+					dest.offset_ = cli.mq_abs_addr_;
+					return true;
+				}
+			}
+		}
+		return false;
+	};
+
+	if (dest.id_ == 0) {
+		if (!FindDest()) {
+			LOG_ERROR() << id() << " pass remote request, topic dest not found.";
+			return false;
+		}
+	} else {
+		node = GetNode(dest.id_);
+		if (!node || !Valid(*node)) {
+			LOG_ERROR() << id() << " pass remote request, dest not found.";
+			return false;
+		}
+	}
+
+	ShmSocket &sender(DefaultSender(node->shm_));
+	auto route = head.add_route();
+	route->set_mq_id(sender.id());
+	route->set_abs_addr(sender.AbsAddr());
+
+	ShmMsg msg(node->shm_);
+	if (!msg.Make(head, body_content)) { return false; }
+	DEFER1(msg.Release(););
+	RecordMsg(msg);
+	return sender.Send(dest, msg, head.msg_id(), std::move(cb));
+}
+
+bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content)
+{
+	// LOG_FUNCTION;
+	auto &topic = head.topic();
+	auto clients = DoFindClients(topic, true);
+	if (clients.empty()) { return true; }
+
+	std::vector<MsgI> msgs;
+	auto ReleaseAll = [&]() {for (auto &msg : msgs) { msg.Release(); } };
+	DEFER1(ReleaseAll(););
+
+	for (auto &cli : clients) {
+		auto Send1 = [&](Node node) {
+			auto &shm = node->shm_;
+			for (auto &msg : msgs) {
+				if (msg.shm().name() == shm.name()) {
+					DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
+					return;
+				}
+			}
+			MsgI msg(shm);
+			if (msg.Make(head, body_content)) {
+				RecordMsg(msg);
+				msgs.push_back(msg);
+				// LOG_DEBUG() << "remote publish to local." << cli.mq_id_ << ", " << cli.mq_abs_addr_;
+				DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
+			}
+		};
+		auto node = cli.weak_node_.lock();
+		if (node) {
+			Send1(node);
+			// should also make sure that mq is not killed before msg expires.
+			// it would be ok if (kill_time - offline_time) is longer than expire time.
+		}
+	}
+
+	return true;
+}
+
+bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content)
+{
+	Node node(GetNode(dest.id_));
+	if (!node) {
+		LOG_ERROR() << id() << " pass remote reply , ssn not found.";
+		return false;
+	}
+	auto offset = node->addrs_[dest.id_];
+	if (offset != dest.offset_) {
+		LOG_ERROR() << id() << " pass remote reply, dest address not match";
+		return false;
+	}
+
+	ShmMsg msg(node->shm_);
+	if (!msg.Make(head, body_content)) { return false; }
+	DEFER1(msg.Release(););
+	RecordMsg(msg);
+	return DefaultSender(node->shm_).Send(dest, msg);
 }
 
 void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val)
@@ -281,7 +398,7 @@
 	auto &node = pos->second;
 	try {
 		for (int i = 0; i < msg.extra_mq_num(); ++i) {
-			ShmSocket tmp(node->shm_, true, head.ssn_id() + i + 1, 16);
+			ShmSocket tmp(node->shm_, head.ssn_id() + i + 1, eCreate);
 			node->addrs_.emplace(tmp.id(), tmp.AbsAddr());
 			auto addr = reply.add_extra_mqs();
 			addr->set_mq_id(tmp.id());
@@ -362,6 +479,7 @@
 		    for (auto &topic : topics) {
 			    LOG_DEBUG() << "\t" << topic;
 		    }
+		    Notify(kTopicNodeService, *node);
 		    return MakeReply(eSuccess);
 	    });
 }
@@ -392,11 +510,16 @@
 		*info->mutable_proc() = node->proc_;
 		info->mutable_proc()->clear_private_info();
 		info->set_online(node->state_.flag_ == kStateNormal);
-		for (auto &addr_topics : node->services_) {
-			for (auto &topic : addr_topics.second) {
-				info->mutable_topics()->add_topic_list(topic);
+		auto AddTopics = [](auto &dst, auto &src) {
+			for (auto &addr_topics : src) {
+				for (auto &topic : addr_topics.second) {
+					dst.add_topic_list(topic);
+				}
 			}
-		}
+		};
+		AddTopics(*info->mutable_service(), node->services_);
+		AddTopics(*info->mutable_local_sub(), node->local_sub_);
+		AddTopics(*info->mutable_net_sub(), node->net_sub_);
 	};
 
 	if (!proc_id.empty()) {
@@ -433,57 +556,93 @@
 	typedef MsgQueryTopicReply Reply;
 
 	auto query = [&](Node self) -> Reply {
-		auto pos = service_map_.find(req.topic());
-		if (pos != service_map_.end() && !pos->second.empty()) {
-			auto &clients = pos->second;
-			Reply reply = MakeReply<Reply>(eSuccess);
-			for (auto &dest : clients) {
-				Node dest_node(dest.weak_node_.lock());
-				if (dest_node && Valid(*dest_node)) {
-					auto node_addr = reply.add_node_address();
-					node_addr->set_proc_id(dest_node->proc_.proc_id());
-					node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
-					node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
+		Reply reply = MakeReply<Reply>(eSuccess);
+		auto local = [&]() {
+			auto pos = service_map_.find(req.topic());
+			if (pos != service_map_.end() && !pos->second.empty()) {
+				auto &clients = pos->second;
+				for (auto &dest : clients) {
+					Node dest_node(dest.weak_node_.lock());
+					if (dest_node && Valid(*dest_node)) {
+						auto node_addr = reply.add_node_address();
+						node_addr->set_proc_id(dest_node->proc_.proc_id());
+						node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
+						node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
+					}
 				}
+				return true;
+			} else {
+				return false;
 			}
-			return reply;
-		} else {
+		};
+		auto net = [&]() {
+			auto hosts(FindRemoteRPCServers(req.topic()));
+			if (hosts.empty()) {
+				return false;
+			} else {
+				for (auto &ip : hosts) {
+					auto node_addr = reply.add_node_address();
+					node_addr->mutable_addr()->set_ip(ip);
+				}
+				return true;
+			}
+		};
+		local();
+		net();
+		if (reply.node_address_size() == 0) {
 			return MakeReply<Reply>(eNotFound, "topic server not found.");
+		} else {
+			return reply;
 		}
 	};
 
 	return HandleMsg<Reply>(head, query);
 }
 
+void NodeCenter::NodeInfo::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node)
+{
+	auto src = SrcAddr(head);
+	auto Sub = [&](auto &sub, auto &sub_map) {
+		auto &topics = msg.topics().topic_list();
+		sub[src].insert(topics.begin(), topics.end());
+		const TopicDest &dest = {src, SrcAbsAddr(head), node};
+		for (auto &topic : topics) {
+			sub_map[topic].insert(dest);
+		}
+	};
+	if (msg.network()) {
+		Sub(net_sub_, center_.net_sub_map_);
+		center_.Notify(kTopicNodeSub, *this);
+	} else {
+		Sub(local_sub_, center_.local_sub_map_);
+	}
+}
+
 MsgCommonReply NodeCenter::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg)
 {
 	return HandleMsg(head, [&](Node node) {
-		auto src = SrcAddr(head);
-		auto &topics = msg.topics().topic_list();
-		node->subscriptions_[src].insert(topics.begin(), topics.end());
-		TopicDest dest = {src, SrcAbsAddr(head), node};
-		for (auto &topic : topics) {
-			subscribe_map_[topic].insert(dest);
-		}
+		node->Subscribe(head, msg, node);
 		return MakeReply(eSuccess);
 	});
 }
-MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg)
-{
-	return HandleMsg(head, [&](Node node) {
-		auto src = SrcAddr(head);
-		auto pos = node->subscriptions_.find(src);
 
-		auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) {
-			auto pos = subscribe_map_.find(topic);
-			if (pos != subscribe_map_.end() &&
+void NodeCenter::NodeInfo::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node)
+{
+	auto src = SrcAddr(head);
+
+	auto Unsub = [&](auto &sub, auto &sub_map) {
+		auto pos = sub.find(src);
+
+		auto RemoveSubTopicDestRecord = [&sub_map](const Topic &topic, const TopicDest &dest) {
+			auto pos = sub_map.find(topic);
+			if (pos != sub_map.end() &&
 			    pos->second.erase(dest) != 0 &&
 			    pos->second.empty()) {
-				subscribe_map_.erase(pos);
+				sub_map.erase(pos);
 			}
 		};
 
-		if (pos != node->subscriptions_.end()) {
+		if (pos != sub.end()) {
 			const TopicDest &dest = {src, SrcAbsAddr(head), node};
 			auto &topics = msg.topics().topic_list();
 			// clear node sub records;
@@ -492,26 +651,55 @@
 				RemoveSubTopicDestRecord(topic, dest);
 			}
 			if (pos->second.empty()) {
-				node->subscriptions_.erase(pos);
+				sub.erase(pos);
 			}
 		}
+	};
+	if (msg.network()) {
+		Unsub(net_sub_, center_.net_sub_map_);
+		center_.Notify(kTopicNodeUnsub, *this);
+	} else {
+		Unsub(local_sub_, center_.local_sub_map_);
+	}
+}
+
+MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg)
+{
+	return HandleMsg(head, [&](Node node) {
+		node->Unsubscribe(head, msg, node);
 		return MakeReply(eSuccess);
 	});
 }
 
-NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic)
+NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote)
 {
+	// LOG_FUNCTION;
 	Clients dests;
 	auto Find1 = [&](const std::string &exact) {
-		auto pos = subscribe_map_.find(exact);
-		if (pos != subscribe_map_.end()) {
-			auto &clients = pos->second;
-			for (auto &cli : clients) {
-				if (Valid(cli.weak_node_)) {
-					dests.insert(cli);
+		auto FindIn = [&](auto &sub_map) {
+			auto pos = sub_map.find(exact);
+			if (pos != sub_map.end()) {
+				auto &clients = pos->second;
+				for (auto &cli : clients) {
+					auto node = cli.weak_node_.lock();
+					if (node) {
+						if (node->state_.flag_ == kStateNormal)
+							dests.insert(cli);
+					}
+
+					// if (Valid(cli.weak_node_)) {
+					// 	dests.insert(cli);
+					// }
 				}
 			}
+		};
+		if (!from_remote) {
+			FindIn(local_sub_map_);
+			// LOG_DEBUG() << "topic '" << topic << "' local clients: " << dests.size();
 		}
+		// net subscripitions also work in local mode.
+		FindIn(net_sub_map_);
+		// LOG_DEBUG() << "topic '" << topic << "' + remote clients: " << dests.size();
 	};
 	Find1(topic);
 
@@ -528,15 +716,31 @@
 	return dests;
 }
 
-bool NodeCenter::FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply)
+MsgCommonReply NodeCenter::Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg)
 {
-	bool ret = false;
-	HandleMsg(head, [&](Node node) {
-		DoFindClients(msg.topic()).swap(out);
-		ret = true;
+	return HandleMsg(head, [&](Node node) {
+		DoPublish(DefaultSender(node->shm_), topic, msg);
 		return MakeReply(eSuccess);
-	}).Swap(&reply);
-	return ret;
+	});
+}
+
+void NodeCenter::DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg)
+{
+	try {
+		auto clients = DoFindClients(topic, false);
+		if (clients.empty()) { return; }
+
+		for (auto &cli : clients) {
+			auto node = cli.weak_node_.lock();
+			if (node) {
+				// should also make sure that mq is not killed before msg expires.
+				// it would be ok if (kill_time - offline_time) is longer than expire time.
+				sock.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
+			}
+		}
+	} catch (...) {
+		LOG_ERROR() << "DoPublish error.";
+	}
 }
 
 void NodeCenter::OnTimer()
@@ -582,7 +786,8 @@
 		}
 	};
 	EraseMapRec(service_map_, node->services_);
-	EraseMapRec(subscribe_map_, node->subscriptions_);
+	EraseMapRec(local_sub_map_, node->local_sub_);
+	EraseMapRec(net_sub_map_, node->net_sub_);
 
 	// remove online record.
 	auto pos = online_node_addr_map_.find(node->proc_.proc_id());
@@ -604,10 +809,6 @@
 void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content)
 {
 	try {
-		// LOG_DEBUG() << "center publish: " << topic << ": " << content;
-		Clients clients(DoFindClients(topic));
-		if (clients.empty()) { return; }
-
 		MsgPublish pub;
 		pub.set_topic(topic);
 		pub.set_data(content);
@@ -616,16 +817,36 @@
 		if (msg.Make(head, pub)) {
 			DEFER1(msg.Release());
 			RecordMsg(msg);
-
-			for (auto &cli : clients) {
-				auto node = cli.weak_node_.lock();
-				if (node && node->state_.flag_ == kStateNormal) {
-					DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
-				}
-			}
+			DoPublish(DefaultSender(shm), topic, msg);
 		}
 
 	} catch (...) {
 		LOG_ERROR() << "center publish error.";
 	}
+}
+
+void NodeCenter::NetRecords::ParseData(const ssjson::Json &info)
+{
+	// LOG_FUNCTION;
+	sub_hosts_.clear();
+	rpc_hosts_.clear();
+	for (auto &host : info.array()) {
+		if (host.get("isLocal", false)) {
+			host_id_ = host.get("serverId", "");
+			ip_ = host.get("ip", "");
+		} else {
+			auto ip = host.get("ip", "");
+			auto UpdateRec = [&](const ssjson::Json::array_type &lot, auto &rec) {
+				for (auto &topic : lot) {
+					auto t = topic.get_value<std::string>();
+					rec[t].insert(ip);
+					// LOG_DEBUG() << "net topic: " << t << ", " << ip;
+				}
+			};
+			// LOG_DEBUG() << "serives:";
+			UpdateRec(host.child("pubTopics").array(), rpc_hosts_);
+			// LOG_DEBUG() << "net sub:";
+			UpdateRec(host.child("netSubTopics").array(), sub_hosts_);
+		}
+	}
 }
\ No newline at end of file

--
Gitblit v1.8.0