From c1e39e20ca42b21eeac8b5068fa1f921bf9a070f Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 23 六月 2021 19:43:29 +0800
Subject: [PATCH] refactor, start tcp pub/sub.

---
 box/tcp_connection.cpp           |   11 ++
 proto/source/bhome_msg_api.proto |    4 
 utest/api_test.cpp               |   15 ++
 box/center_topic_node.cpp        |   16 ++-
 box/center.cpp                   |   23 +---
 box/tcp_proxy.h                  |    1 
 proto/source/bhome_msg.proto     |    2 
 box/node_center.h                |   24 +++-
 box/tcp_proxy.cpp                |    5 +
 box/node_center.cpp              |  184 +++++++++++++++++++++++++++---------
 src/topic_node.cpp               |    1 
 11 files changed, 205 insertions(+), 81 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index 0e4c40b..78135d1 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -135,27 +135,18 @@
 
 	auto OnBusIdle = [=](ShmSocket &socket) {};
 	auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; };
-	auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
+	auto OnPubSub = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
 		auto &center = *center_ptr;
 		auto replyer = MakeReplyer(socket, head, center);
 		auto OnPublish = [&]() {
 			MsgPublish pub;
-			NodeCenter::Clients clients;
-			MsgCommonReply reply;
-			if (head.route_size() != 1 || !msg.ParseBody(pub)) {
-				return;
-			} else if (!center->FindClients(head, pub, clients, reply)) {
+			if (head.route_size() == 1 && msg.ParseBody(pub)) {
+				// replyer(center->Publish(head, pub.topic(), msg)); // dead lock?
+				auto reply(center->Publish(head, pub.topic(), msg));
 				replyer(reply);
-			} else {
-				replyer(MakeReply(eSuccess));
-				if (clients.empty()) { return; }
-				for (auto &cli : clients) {
-					auto node = cli.weak_node_.lock();
-					if (node) {
-						// should also make sure that mq is not killed before msg expires.
-						// it would be ok if (kill_time - offline_time) is longer than expire time.
-						socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
-					}
+				auto hosts = center->FindRemoteSubClients(pub.topic());
+				for (auto &host : hosts) {
+					tcp_proxy.Publish(host, kBHCenterPort, pub.SerializeAsString());
 				}
 			}
 		};
diff --git a/box/center_topic_node.cpp b/box/center_topic_node.cpp
index 8228992..3c4f369 100644
--- a/box/center_topic_node.cpp
+++ b/box/center_topic_node.cpp
@@ -43,11 +43,17 @@
 		proc.put("name", info.proc().name());
 		proc.put("publicInfo", info.proc().public_info());
 		proc.put("online", info.online());
-		Json topics = Json::Array();
-		for (auto &t : info.topics().topic_list()) {
-			topics.push_back(t);
-		}
-		proc.put("topics", topics);
+		auto AddTopics = [&](auto &name, auto &topic_list) {
+			Json topics = Json::Array();
+			for (auto &t : topic_list) {
+				topics.push_back(t);
+			}
+			proc.put(name, topics);
+		};
+		AddTopics("service", info.service().topic_list());
+		AddTopics("local_sub", info.local_sub().topic_list());
+		AddTopics("net_sub", info.net_sub().topic_list());
+
 		list.push_back(proc);
 	}
 	return json.dump(0);
diff --git a/box/node_center.cpp b/box/node_center.cpp
index 5c24409..c32b197 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -267,6 +267,43 @@
 	return sender.Send(dest, msg, head.msg_id(), std::move(cb));
 }
 
+bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content)
+{
+	auto &topic = head.topic();
+	auto clients = DoFindClients(topic, true);
+	if (clients.empty()) { return true; }
+
+	std::vector<MsgI> msgs;
+	auto ReleaseAll = [&]() {for (auto &msg : msgs) { msg.Release(); } };
+	DEFER1(ReleaseAll(););
+
+	for (auto &cli : clients) {
+		auto Send1 = [&](Node node) {
+			auto &shm = node->shm_;
+			for (auto &msg : msgs) {
+				if (msg.shm().name() == shm.name()) {
+					DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
+					return;
+				}
+			}
+			MsgI msg(shm);
+			if (msg.Make(body_content)) {
+				RecordMsg(msg);
+				msgs.push_back(msg);
+				DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
+			}
+		};
+		auto node = cli.weak_node_.lock();
+		if (node) {
+			Send1(node);
+			// should also make sure that mq is not killed before msg expires.
+			// it would be ok if (kill_time - offline_time) is longer than expire time.
+		}
+	}
+
+	return true;
+}
+
 bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content)
 {
 	Node node(GetNode(dest.id_));
@@ -469,11 +506,16 @@
 		*info->mutable_proc() = node->proc_;
 		info->mutable_proc()->clear_private_info();
 		info->set_online(node->state_.flag_ == kStateNormal);
-		for (auto &addr_topics : node->services_) {
-			for (auto &topic : addr_topics.second) {
-				info->mutable_topics()->add_topic_list(topic);
+		auto AddTopics = [](auto &dst, auto &src) {
+			for (auto &addr_topics : src) {
+				for (auto &topic : addr_topics.second) {
+					dst.add_topic_list(topic);
+				}
 			}
-		}
+		};
+		AddTopics(*info->mutable_service(), node->services_);
+		AddTopics(*info->mutable_local_sub(), node->local_sub_);
+		AddTopics(*info->mutable_net_sub(), node->net_sub_);
 	};
 
 	if (!proc_id.empty()) {
@@ -532,35 +574,50 @@
 	return HandleMsg<Reply>(head, query);
 }
 
+void NodeCenter::NodeInfo::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node)
+{
+	auto src = SrcAddr(head);
+	auto Sub = [&](auto &sub, auto &sub_map) {
+		auto &topics = msg.topics().topic_list();
+		sub[src].insert(topics.begin(), topics.end());
+		const TopicDest &dest = {src, SrcAbsAddr(head), node};
+		for (auto &topic : topics) {
+			sub_map[topic].insert(dest);
+		}
+	};
+	LOG_DEBUG() << "subscribe net : " << msg.network();
+	if (msg.network()) {
+		Sub(net_sub_, center_.net_sub_map_);
+	} else {
+		Sub(local_sub_, center_.local_sub_map_);
+	}
+}
+
 MsgCommonReply NodeCenter::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg)
 {
 	return HandleMsg(head, [&](Node node) {
-		auto src = SrcAddr(head);
-		auto &topics = msg.topics().topic_list();
-		node->subscriptions_[src].insert(topics.begin(), topics.end());
-		TopicDest dest = {src, SrcAbsAddr(head), node};
-		for (auto &topic : topics) {
-			subscribe_map_[topic].insert(dest);
-		}
+		node->Subscribe(head, msg, node);
 		return MakeReply(eSuccess);
 	});
 }
-MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg)
-{
-	return HandleMsg(head, [&](Node node) {
-		auto src = SrcAddr(head);
-		auto pos = node->subscriptions_.find(src);
 
-		auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) {
-			auto pos = subscribe_map_.find(topic);
-			if (pos != subscribe_map_.end() &&
+void NodeCenter::NodeInfo::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node)
+{
+	auto src = SrcAddr(head);
+
+	auto Unsub = [&](auto &sub, auto &sub_map) {
+		auto pos = sub.find(src);
+
+		auto RemoveSubTopicDestRecord = [&sub_map](const Topic &topic, const TopicDest &dest) {
+			auto pos = sub_map.find(topic);
+			if (pos != sub_map.end() &&
 			    pos->second.erase(dest) != 0 &&
 			    pos->second.empty()) {
-				subscribe_map_.erase(pos);
+				sub_map.erase(pos);
 			}
 		};
 
-		if (pos != node->subscriptions_.end()) {
+		if (pos != sub.end()) {
 			const TopicDest &dest = {src, SrcAbsAddr(head), node};
 			auto &topics = msg.topics().topic_list();
 			// clear node sub records;
@@ -569,26 +626,44 @@
 				RemoveSubTopicDestRecord(topic, dest);
 			}
 			if (pos->second.empty()) {
-				node->subscriptions_.erase(pos);
+				sub.erase(pos);
 			}
 		}
+	};
+	if (msg.network()) {
+		Unsub(net_sub_, center_.net_sub_map_);
+	} else {
+		Unsub(local_sub_, center_.local_sub_map_);
+	}
+}
+
+MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg)
+{
+	return HandleMsg(head, [&](Node node) {
+		node->Unsubscribe(head, msg, node);
 		return MakeReply(eSuccess);
 	});
 }
 
-NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic)
+NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote)
 {
 	Clients dests;
 	auto Find1 = [&](const std::string &exact) {
-		auto pos = subscribe_map_.find(exact);
-		if (pos != subscribe_map_.end()) {
-			auto &clients = pos->second;
-			for (auto &cli : clients) {
-				if (Valid(cli.weak_node_)) {
-					dests.insert(cli);
+		auto FindIn = [&](auto &sub_map) {
+			auto pos = sub_map.find(exact);
+			if (pos != sub_map.end()) {
+				auto &clients = pos->second;
+				for (auto &cli : clients) {
+					if (Valid(cli.weak_node_)) {
+						dests.insert(cli);
+					}
 				}
 			}
+		};
+		if (!from_remote) {
+			FindIn(local_sub_map_);
 		}
+		FindIn(net_sub_map_);
 	};
 	Find1(topic);
 
@@ -605,15 +680,31 @@
 	return dests;
 }
 
-bool NodeCenter::FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply)
+MsgCommonReply NodeCenter::Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg)
 {
-	bool ret = false;
-	HandleMsg(head, [&](Node node) {
-		DoFindClients(msg.topic()).swap(out);
-		ret = true;
+	return HandleMsg(head, [&](Node node) {
+		DoPublish(DefaultSender(node->shm_), topic, msg);
 		return MakeReply(eSuccess);
-	}).Swap(&reply);
-	return ret;
+	});
+}
+
+void NodeCenter::DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg)
+{
+	try {
+		auto clients = DoFindClients(topic, false);
+		if (clients.empty()) { return; }
+
+		for (auto &cli : clients) {
+			auto node = cli.weak_node_.lock();
+			if (node) {
+				// should also make sure that mq is not killed before msg expires.
+				// it would be ok if (kill_time - offline_time) is longer than expire time.
+				sock.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
+			}
+		}
+	} catch (...) {
+		LOG_ERROR() << "DoPublish error.";
+	}
 }
 
 void NodeCenter::OnTimer()
@@ -659,7 +750,8 @@
 		}
 	};
 	EraseMapRec(service_map_, node->services_);
-	EraseMapRec(subscribe_map_, node->subscriptions_);
+	EraseMapRec(local_sub_map_, node->local_sub_);
+	EraseMapRec(net_sub_map_, node->net_sub_);
 
 	// remove online record.
 	auto pos = online_node_addr_map_.find(node->proc_.proc_id());
@@ -681,10 +773,6 @@
 void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content)
 {
 	try {
-		// LOG_DEBUG() << "center publish: " << topic << ": " << content;
-		Clients clients(DoFindClients(topic));
-		if (clients.empty()) { return; }
-
 		MsgPublish pub;
 		pub.set_topic(topic);
 		pub.set_data(content);
@@ -693,16 +781,16 @@
 		if (msg.Make(head, pub)) {
 			DEFER1(msg.Release());
 			RecordMsg(msg);
-
-			for (auto &cli : clients) {
-				auto node = cli.weak_node_.lock();
-				if (node && node->state_.flag_ == kStateNormal) {
-					DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
-				}
-			}
+			DoPublish(DefaultSender(shm), topic, msg);
 		}
 
 	} catch (...) {
 		LOG_ERROR() << "center publish error.";
 	}
+}
+
+std::vector<std::string> NodeCenter::FindRemoteSubClients(const Topic &topic)
+{
+	//TODO search synced full list;
+	return std::vector<std::string>();
 }
\ No newline at end of file
diff --git a/box/node_center.h b/box/node_center.h
index 74dd52f..54f84c0 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -82,6 +82,10 @@
 	};
 	typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
 
+	struct NodeInfo;
+	typedef std::shared_ptr<NodeInfo> Node;
+	typedef std::weak_ptr<NodeInfo> WeakNode;
+
 	struct NodeInfo {
 		NodeCenter &center_;
 		SharedMemory &shm_;
@@ -89,14 +93,15 @@
 		std::map<MQId, int64_t> addrs_; // registered mqs
 		ProcInfo proc_;                 //
 		AddressTopics services_;        // address: topics
-		AddressTopics subscriptions_;   // address: topics
+		AddressTopics local_sub_;       // address: topics
+		AddressTopics net_sub_;         // address: topics
 		NodeInfo(NodeCenter &center, SharedMemory &shm) :
 		    center_(center), shm_(shm) {}
 		void PutOffline(const int64_t offline_time);
 		void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time);
+		void Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node);
+		void Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node);
 	};
-	typedef std::shared_ptr<NodeInfo> Node;
-	typedef std::weak_ptr<NodeInfo> WeakNode;
 
 	struct TopicDest {
 		MQId mq_id_;
@@ -121,7 +126,9 @@
 	void RecordMsg(const MsgI &msg);
 	bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg);
 	bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg);
+
 	bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
+	bool RemotePublish(BHMsgHead &head, const std::string &body_content);
 	bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content);
 	void OnAlloc(ShmSocket &socket, const int64_t val);
 	void OnFree(ShmSocket &socket, const int64_t val);
@@ -176,15 +183,19 @@
 	MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req);
 	MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg);
 	MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg);
-	Clients DoFindClients(const std::string &topic);
-	bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply);
+	MsgCommonReply Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg);
 
 	void OnTimer();
+
+	// remote hosts records
+	std::vector<std::string> FindRemoteSubClients(const Topic &topic);
 
 private:
 	void CheckNodes();
 	bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; }
 	void Publish(SharedMemory &shm, const Topic &topic, const std::string &content);
+	void DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg);
+	Clients DoFindClients(const std::string &topic, bool from_remote);
 	bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; }
 	bool Valid(const WeakNode &weak)
 	{
@@ -197,7 +208,8 @@
 	std::string id_; // center proc id;
 
 	std::unordered_map<Topic, Clients> service_map_;
-	std::unordered_map<Topic, Clients> subscribe_map_;
+	std::unordered_map<Topic, Clients> local_sub_map_;
+	std::unordered_map<Topic, Clients> net_sub_map_;
 	std::unordered_map<Address, Node> nodes_;
 	std::unordered_map<ProcId, Address> online_node_addr_map_;
 	ProcRecords procs_; // To get a short index for msg alloc.
diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp
index 85ed4ed..6506369 100644
--- a/box/tcp_connection.cpp
+++ b/box/tcp_connection.cpp
@@ -178,8 +178,17 @@
 			send_buffer_ = imsg.content();
 			async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
 		};
+
 		auto &scenter = *pscenter_;
-		if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) {
+		if (head.type() == kMsgTypePublish) {
+			auto reply = MakeReply(eSuccess);
+			auto rep_head = InitMsgHead(GetType(reply), scenter->id(), 0, head.msg_id());
+			send_buffer_ = MsgI::Serialize(rep_head, reply);
+			async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
+
+			scenter->RemotePublish(head, body_content);
+			return;
+		} else if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) {
 			return;
 		} else {
 			Close();
diff --git a/box/tcp_proxy.cpp b/box/tcp_proxy.cpp
index b4ec497..803874d 100644
--- a/box/tcp_proxy.cpp
+++ b/box/tcp_proxy.cpp
@@ -31,3 +31,8 @@
 		return false;
 	}
 }
+
+bool TcpProxy::Publish(const std::string &ip, int port, std::string &&content)
+{
+	return Request(ip, port, std::move(content), ReplyCB());
+}
\ No newline at end of file
diff --git a/box/tcp_proxy.h b/box/tcp_proxy.h
index 69c3f03..09febe5 100644
--- a/box/tcp_proxy.h
+++ b/box/tcp_proxy.h
@@ -33,6 +33,7 @@
 	TcpProxy(io_service_t &io) :
 	    io_(io) {}
 	bool Request(const std::string &ip, int port, std::string &&content, ReplyCB const &cb);
+	bool Publish(const std::string &ip, int port, std::string &&content);
 
 private:
 	io_service_t &io_;
diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index f34aebb..b1f9772 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -57,9 +57,11 @@
 
 message MsgSubscribe {
 	MsgTopicList topics = 1;
+	bool network = 2;
 }
 message MsgUnsubscribe {
 	MsgTopicList topics = 1;
+	bool network = 2;
 }
 message MsgRegisterRPC {
 	MsgTopicList topics = 1;
diff --git a/proto/source/bhome_msg_api.proto b/proto/source/bhome_msg_api.proto
index 6a20aa7..0ef3451 100644
--- a/proto/source/bhome_msg_api.proto
+++ b/proto/source/bhome_msg_api.proto
@@ -84,7 +84,9 @@
 	message Info {
 		ProcInfo proc  = 1;
 		bool online = 2;
-		MsgTopicList topics = 3;
+		MsgTopicList service = 3;
+		MsgTopicList local_sub = 4;
+		MsgTopicList net_sub = 5;
 	}
 	repeated Info proc_list = 2;
 }
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 5362318..6f98694 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -628,6 +628,7 @@
 		auto &sock = SockPub();
 		BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn()));
 		AddRoute(head, sock);
+		head.set_topic(pub.topic());
 
 		if (timeout_ms == 0) {
 			return sock.Send(BusAddr(), head, pub);
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 5363d6e..239ea8b 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -256,14 +256,21 @@
 			printf("proc [%d] %s, %s, %s\n\ttopics\n", i,
 			       (info.online() ? "online" : "offline"),
 			       info.proc().proc_id().c_str(), info.proc().name().c_str());
-			for (auto &t : info.topics().topic_list()) {
-				printf("\t\t %s\n", t.c_str());
-			}
+			auto PrintTopics = [](std::string const &name, auto &topic_list) {
+				printf("%s:[", name.c_str());
+				for (auto &t : topic_list) {
+					printf("%s,", t.c_str());
+				}
+				printf("]\n");
+			};
+			PrintTopics("service", info.service().topic_list());
+			PrintTopics("local_sub", info.local_sub().topic_list());
+			PrintTopics("net_sub", info.net_sub().topic_list());
 			printf("\n");
 		}
 		printf("\n");
 	};
-	if (0) {
+	if (1) {
 		// query procs
 		std::string dest(BHAddress().SerializeAsString());
 		MsgQueryProc query;

--
Gitblit v1.8.0