From ae17d1439b35b55212c3a30712e0a60b1d6a99c0 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 30 六月 2021 11:15:53 +0800
Subject: [PATCH] support tcp pub/sub.

---
 box/tcp_connection.cpp    |    2 
 utest/api_test.cpp        |   91 ++++++++++++-----
 box/center_topic_node.cpp |   29 +++++
 src/bh_api.cc             |    4 
 box/center.cpp            |    2 
 api/bhsgo/bhome_node.go   |    5 +
 src/topic_node.h          |   10 +
 box/node_center.h         |   32 ++++++
 src/bh_api.h              |    5 +
 box/node_center.cpp       |   82 ++++++++++++---
 src/exported_symbols      |    1 
 src/topic_node.cpp        |    3 
 12 files changed, 210 insertions(+), 56 deletions(-)

diff --git a/api/bhsgo/bhome_node.go b/api/bhsgo/bhome_node.go
index 9d66814..09d571b 100644
--- a/api/bhsgo/bhome_node.go
+++ b/api/bhsgo/bhome_node.go
@@ -47,6 +47,11 @@
 	return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeTopics), data, reply, timeout_ms)
 }
 
+func SubscribeNet(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool {
+	data, _ := topics.Marshal()
+	return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeNetTopics), data, reply, timeout_ms)
+}
+
 func Heartbeat(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool {
 	data, _ := proc.Marshal()
 	return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHHeartbeat), data, reply, timeout_ms)
diff --git a/box/center.cpp b/box/center.cpp
index 78135d1..e0abbb3 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -146,7 +146,7 @@
 				replyer(reply);
 				auto hosts = center->FindRemoteSubClients(pub.topic());
 				for (auto &host : hosts) {
-					tcp_proxy.Publish(host, kBHCenterPort, pub.SerializeAsString());
+					tcp_proxy.Publish(host, kBHCenterPort, msg.content());
 				}
 			}
 		};
diff --git a/box/center_topic_node.cpp b/box/center_topic_node.cpp
index 3c4f369..1f4103f 100644
--- a/box/center_topic_node.cpp
+++ b/box/center_topic_node.cpp
@@ -30,6 +30,7 @@
 namespace
 {
 const std::string &kTopicQueryProc = "#center_query_procs";
+const std::string &kTopicNotifyRemoteInfo = "pub-allRegisterInfo-to-center";
 
 std::string ToJson(const MsgQueryProcReply &qpr)
 {
@@ -92,10 +93,16 @@
 		throw std::runtime_error("center node register failed.");
 	}
 
-	MsgTopicList topics;
-	topics.add_topic_list(kTopicQueryProc);
-	if (!pnode_->DoServerRegisterRPC(true, topics, reply, timeout)) {
+	MsgTopicList services;
+	services.add_topic_list(kTopicQueryProc);
+	if (!pnode_->DoServerRegisterRPC(true, services, reply, timeout)) {
 		throw std::runtime_error("center node register topics failed.");
+	}
+	MsgTopicList subs;
+
+	subs.add_topic_list(kTopicNotifyRemoteInfo);
+	if (!pnode_->Subscribe(subs, reply, timeout)) {
+		throw std::runtime_error("center node subscribe topics failed.");
 	}
 
 	auto onRequest = [this](void *src_info, std::string &client_proc_id, MsgRequestTopic &request) {
@@ -117,6 +124,20 @@
 		pnode_->ServerSendReply(src_info, reply);
 	};
 
+	auto OnSubRecv = [&](const std::string &proc_id, const MsgPublish &data) {
+		if (data.topic() == kTopicNotifyRemoteInfo) {
+			// parse other data.
+			// LOG_DEBUG() << "center got net info.";
+			ssjson::Json js;
+			if (js.parse(data.data())) {
+				if (js.is_array()) {
+					auto &center = *pscenter_;
+					center->ParseNetInfo(js);
+				}
+			}
+		}
+	};
+
 	bool cur = false;
 	if (run_.compare_exchange_strong(cur, true)) {
 		auto heartbeat = [this]() {
@@ -126,7 +147,7 @@
 			}
 		};
 		std::thread(heartbeat).swap(worker_);
-		return pnode_->ServerStart(onRequest);
+		return pnode_->ServerStart(onRequest) && pnode_->SubscribeStartWorker(OnSubRecv);
 	} else {
 		return false;
 	}
diff --git a/box/node_center.cpp b/box/node_center.cpp
index 068aa00..77bfac1 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -270,6 +270,7 @@
 
 bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content)
 {
+	// LOG_FUNCTION;
 	auto &topic = head.topic();
 	auto clients = DoFindClients(topic, true);
 	if (clients.empty()) { return true; }
@@ -288,9 +289,10 @@
 				}
 			}
 			MsgI msg(shm);
-			if (msg.Make(body_content)) {
+			if (msg.Make(head, body_content)) {
 				RecordMsg(msg);
 				msgs.push_back(msg);
+				// LOG_DEBUG() << "remote publish to local." << cli.mq_id_ << ", " << cli.mq_abs_addr_;
 				DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
 			}
 		};
@@ -554,22 +556,43 @@
 	typedef MsgQueryTopicReply Reply;
 
 	auto query = [&](Node self) -> Reply {
-		auto pos = service_map_.find(req.topic());
-		if (pos != service_map_.end() && !pos->second.empty()) {
-			auto &clients = pos->second;
-			Reply reply = MakeReply<Reply>(eSuccess);
-			for (auto &dest : clients) {
-				Node dest_node(dest.weak_node_.lock());
-				if (dest_node && Valid(*dest_node)) {
-					auto node_addr = reply.add_node_address();
-					node_addr->set_proc_id(dest_node->proc_.proc_id());
-					node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
-					node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
+		Reply reply = MakeReply<Reply>(eSuccess);
+		auto local = [&]() {
+			auto pos = service_map_.find(req.topic());
+			if (pos != service_map_.end() && !pos->second.empty()) {
+				auto &clients = pos->second;
+				for (auto &dest : clients) {
+					Node dest_node(dest.weak_node_.lock());
+					if (dest_node && Valid(*dest_node)) {
+						auto node_addr = reply.add_node_address();
+						node_addr->set_proc_id(dest_node->proc_.proc_id());
+						node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
+						node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
+					}
 				}
+				return true;
+			} else {
+				return false;
 			}
-			return reply;
-		} else {
+		};
+		auto net = [&]() {
+			auto hosts(FindRemoteRPCServers(req.topic()));
+			if (hosts.empty()) {
+				return false;
+			} else {
+				for (auto &ip : hosts) {
+					auto node_addr = reply.add_node_address();
+					node_addr->mutable_addr()->set_ip(ip);
+				}
+				return true;
+			}
+		};
+		local();
+		net();
+		if (reply.node_address_size() == 0) {
 			return MakeReply<Reply>(eNotFound, "topic server not found.");
+		} else {
+			return reply;
 		}
 	};
 
@@ -587,7 +610,6 @@
 			sub_map[topic].insert(dest);
 		}
 	};
-	LOG_DEBUG() << "subscribe net : " << msg.network();
 	if (msg.network()) {
 		Sub(net_sub_, center_.net_sub_map_);
 		center_.Notify(kTopicNodeSub, *this);
@@ -651,6 +673,7 @@
 
 NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote)
 {
+	// LOG_FUNCTION;
 	Clients dests;
 	auto Find1 = [&](const std::string &exact) {
 		auto FindIn = [&](auto &sub_map) {
@@ -666,8 +689,11 @@
 		};
 		if (!from_remote) {
 			FindIn(local_sub_map_);
+			// LOG_DEBUG() << "topic '" << topic << "' local clients: " << dests.size();
 		}
+		// net subscripitions also work in local mode.
 		FindIn(net_sub_map_);
+		// LOG_DEBUG() << "topic '" << topic << "' + remote clients: " << dests.size();
 	};
 	Find1(topic);
 
@@ -793,8 +819,28 @@
 	}
 }
 
-std::vector<std::string> NodeCenter::FindRemoteSubClients(const Topic &topic)
+void NodeCenter::NetRecords::ParseData(const ssjson::Json &info)
 {
-	//TODO search synced full list;
-	return std::vector<std::string>();
+	// LOG_FUNCTION;
+	sub_hosts_.clear();
+	rpc_hosts_.clear();
+	for (auto &host : info.array()) {
+		if (host.get("isLocal", false)) {
+			host_id_ = host.get("serverId", "");
+			ip_ = host.get("ip", "");
+		} else {
+			auto ip = host.get("ip", "");
+			auto UpdateRec = [&](const ssjson::Json::array_type &lot, auto &rec) {
+				for (auto &topic : lot) {
+					auto t = topic.get_value<std::string>();
+					rec[t].insert(ip);
+					// LOG_DEBUG() << "net topic: " << t << ", " << ip;
+				}
+			};
+			// LOG_DEBUG() << "serives:";
+			UpdateRec(host.child("pubTopics").array(), rpc_hosts_);
+			// LOG_DEBUG() << "net sub:";
+			UpdateRec(host.child("netSubTopics").array(), sub_hosts_);
+		}
+	}
 }
\ No newline at end of file
diff --git a/box/node_center.h b/box/node_center.h
index ae5b075..b6ceab5 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -18,6 +18,7 @@
 #ifndef NODE_CENTER_KY67RJ1Q
 #define NODE_CENTER_KY67RJ1Q
 
+#include "json.h"
 #include "shm_socket.h"
 #include <unordered_map>
 
@@ -188,7 +189,9 @@
 	void OnTimer();
 
 	// remote hosts records
-	std::vector<std::string> FindRemoteSubClients(const Topic &topic);
+	std::set<std::string> FindRemoteSubClients(const Topic &topic) { return net_records_.FindSubHosts(topic); }
+	std::set<std::string> FindRemoteRPCServers(const Topic &topic) { return net_records_.FindRPCHosts(topic); }
+	void ParseNetInfo(ssjson::Json &info) { net_records_.ParseData(info); }
 
 private:
 	void CheckNodes();
@@ -219,6 +222,33 @@
 	int64_t offline_time_;
 	int64_t kill_time_;
 	int64_t last_check_time_;
+
+	// net hosts info
+	class NetRecords
+	{
+	public:
+		typedef std::set<std::string> Hosts;
+		void ParseData(const ssjson::Json &input);
+		Hosts FindRPCHosts(const Topic &topic) { return FindHosts(topic, rpc_hosts_); }
+		Hosts FindSubHosts(const Topic &topic) { return FindHosts(topic, sub_hosts_); }
+
+	private:
+		typedef std::unordered_map<Topic, Hosts> TopicMap;
+		TopicMap sub_hosts_;
+		TopicMap rpc_hosts_;
+		Hosts FindHosts(const Topic &topic, const TopicMap &tmap)
+		{
+			auto pos = tmap.find(topic);
+			if (pos != tmap.end()) {
+				return pos->second;
+			} else {
+				return Hosts();
+			}
+		}
+		std::string host_id_;
+		std::string ip_;
+	};
+	NetRecords net_records_;
 };
 
 #endif // end of include guard: NODE_CENTER_KY67RJ1Q
diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp
index 6506369..8f0fe86 100644
--- a/box/tcp_connection.cpp
+++ b/box/tcp_connection.cpp
@@ -63,7 +63,7 @@
 	if (4 > len) { return false; }
 	uint32_t head_len = Get32(p);
 	if (head_len > 1024 * 4) {
-		throw std::runtime_error("unexpected tcp reply data.");
+		throw std::runtime_error("unexpected tcp data head.");
 	}
 	auto before_body = 4 + head_len + 4;
 	if (before_body > len) {
diff --git a/src/bh_api.cc b/src/bh_api.cc
index 3dafe7a..6bcf45c 100644
--- a/src/bh_api.cc
+++ b/src/bh_api.cc
@@ -226,6 +226,10 @@
 {
 	return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
 }
+int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+{
+	return BHApi_In1_Out1<MsgTopicList>(&TopicNode::SubscribeNet, topics, topics_len, reply, reply_len, timeout_ms);
+}
 
 int BHPublish(const void *msgpub,
               const int msgpub_len,
diff --git a/src/bh_api.h b/src/bh_api.h
index 3b77da5..8178e55 100644
--- a/src/bh_api.h
+++ b/src/bh_api.h
@@ -57,6 +57,11 @@
                       void **reply,
                       int *reply_len,
                       const int timeout_ms);
+int BHSubscribeNetTopics(const void *topics,
+                         const int topics_len,
+                         void **reply,
+                         int *reply_len,
+                         const int timeout_ms);
 
 typedef void (*FSubDataCallback)(const void *proc_id,
                                  int proc_id_len,
diff --git a/src/exported_symbols b/src/exported_symbols
index addfadc..4867769 100644
--- a/src/exported_symbols
+++ b/src/exported_symbols
@@ -7,6 +7,7 @@
 	BHQueryTopicAddress;
 	BHQueryProcs;
 	BHSubscribeTopics;
+	BHSubscribeNetTopics;
 	BHStartWorker;
 	BHHeartbeatEasy;
 	BHHeartbeat;
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 6f98694..6096fbb 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -649,7 +649,7 @@
 
 // subscribe
 
-bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
+bool TopicNode::DoSubscribe(MsgTopicList &topics, const bool net, MsgCommonReply &reply_body, const int timeout_ms)
 {
 	if (!IsOnline()) {
 		SetLastError(eNotRegistered, kErrMsgNotRegistered);
@@ -659,6 +659,7 @@
 	try {
 		auto &sock = SockSub();
 		MsgSubscribe sub;
+		sub.set_network(net);
 		sub.mutable_topics()->Swap(&topics);
 
 		BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn()));
diff --git a/src/topic_node.h b/src/topic_node.h
index 1425844..9e7eed2 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -73,7 +73,15 @@
 	// subscribe
 	typedef std::function<void(const std::string &proc_id, const MsgPublish &data)> SubDataCB;
 	bool SubscribeStartWorker(const SubDataCB &tdcb, int nworker = 2);
-	bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms);
+	bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
+	{
+		return DoSubscribe(topics, false, reply_body, timeout_ms);
+	}
+	bool SubscribeNet(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
+	{
+		return DoSubscribe(topics, true, reply_body, timeout_ms);
+	}
+	bool DoSubscribe(MsgTopicList &topics, const bool net, MsgCommonReply &reply_body, const int timeout_ms);
 	bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms);
 
 	void Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2);
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 239ea8b..13a552d 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -16,6 +16,7 @@
  * =====================================================================================
  */
 #include "bh_api.h"
+#include "json.h"
 #include "robust.h"
 #include "util.h"
 #include <atomic>
@@ -330,17 +331,27 @@
 	}
 
 	{ // Subscribe
-		MsgTopicList topics;
-		topics.add_topic_list("#center.node");
-		for (int i = 0; i < 10; ++i) {
-			topics.add_topic_list(topic_ + std::to_string(i * 2));
-		}
-		std::string s = topics.SerializeAsString();
-		void *reply = 0;
-		int reply_len = 0;
-		bool r = BHSubscribeTopics(s.data(), s.size(), &reply, &reply_len, 1000);
-		BHFree(reply, reply_len);
-		printf("subscribe topic : %s\n", r ? "ok" : "failed");
+		auto Subscribe = [&](std::string topic, bool net) {
+			MsgTopicList topics;
+			topics.add_topic_list(topic);
+			for (int i = 0; i < 10; ++i) {
+				topics.add_topic_list(topic_ + std::to_string(i * 2));
+			}
+			std::string s = topics.SerializeAsString();
+			void *reply = 0;
+			int reply_len = 0;
+			bool r = false;
+			if (net) {
+				r = BHSubscribeNetTopics(s.data(), s.size(), &reply, &reply_len, 1000);
+			} else {
+				r = BHSubscribeTopics(s.data(), s.size(), &reply, &reply_len, 1000);
+			}
+			BHFree(reply, reply_len);
+			printf("subscribe topic %s: %s\n", topic.c_str(), (r ? "ok" : "failed"));
+		};
+		Subscribe("#center.node", false);
+		Subscribe("local0", false);
+		Subscribe("net0", true);
 	}
 
 	auto ServerLoop = [&](std::atomic<bool> *run) {
@@ -368,14 +379,47 @@
 		}
 	};
 
+	std::atomic<bool> run(true);
+	ThreadManager threads;
+#if 1
+	BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
+#else
+	BHStartWorker(FServerCallback(), &SubRecvProc, &ClientProc);
+	threads.Launch(ServerLoop, &run);
+#endif
+
+	auto Publish = [&](const std::string &topic, const std::string &data) {
+		MsgPublish pub;
+		pub.set_topic(topic);
+		pub.set_data(data);
+		std::string s(pub.SerializeAsString());
+		BHPublish(s.data(), s.size(), 0);
+	};
+
 	{
+		// publish
+		Publish(topic_ + std::to_string(0), "pub_data_" + std::string(104 * 1, 'a'));
 		for (int i = 0; i < 1; ++i) {
-			MsgPublish pub;
-			pub.set_topic(topic_ + std::to_string(i));
-			pub.set_data("pub_data_" + std::string(104 * 1, 'a'));
-			std::string s(pub.SerializeAsString());
-			BHPublish(s.data(), s.size(), 0);
-			// Sleep(1s);
+
+			ssjson::Json net = ssjson::Json::Array();
+			ssjson::Json host;
+			host.put("serverId", "test_host");
+			host.put("ip", "127.0.0.1");
+			ssjson::Json topics = ssjson::Json::Array();
+			topics.push_back("aaaaa");
+			topics.push_back("bbbbb");
+			host.put("pubTopics", topics);
+			topics.push_back("net0");
+			topics.push_back("net1");
+			host.put("netSubTopics", topics);
+			net.push_back(host);
+
+			Publish("pub-allRegisterInfo-to-center", net.dump());
+			Sleep(1s);
+			Publish("local0", "local-abcd0");
+			Publish("net0", "net-abcd0");
+			Publish("local0", "local-abcd1");
+			Sleep(1s);
 		}
 	}
 
@@ -428,22 +472,11 @@
 		}
 	};
 
-	std::atomic<bool> run(true);
-
-	ThreadManager threads;
-
-#if 1
-	BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
-#else
-	BHStartWorker(FServerCallback(), &SubRecvProc, &ClientProc);
-	threads.Launch(ServerLoop, &run);
-#endif
-
 	boost::timer::auto_cpu_timer timer;
 	threads.Launch(hb, &run);
 	threads.Launch(showStatus, &run);
 	int ncli = 10;
-	const int64_t nreq = 1000 * 100;
+	const int64_t nreq = 1000; //* 100;
 
 	for (int i = 0; i < 10; ++i) {
 		SyncRequest(topic_ + std::to_string(0), "request_data_" + std::to_string(i));

--
Gitblit v1.8.0