From c1e39e20ca42b21eeac8b5068fa1f921bf9a070f Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 23 六月 2021 19:43:29 +0800
Subject: [PATCH] refactor, start tcp pub/sub.

---
 box/center.cpp |   23 +++++++----------------
 1 files changed, 7 insertions(+), 16 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index 0e4c40b..78135d1 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -135,27 +135,18 @@
 
 	auto OnBusIdle = [=](ShmSocket &socket) {};
 	auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; };
-	auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
+	auto OnPubSub = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
 		auto &center = *center_ptr;
 		auto replyer = MakeReplyer(socket, head, center);
 		auto OnPublish = [&]() {
 			MsgPublish pub;
-			NodeCenter::Clients clients;
-			MsgCommonReply reply;
-			if (head.route_size() != 1 || !msg.ParseBody(pub)) {
-				return;
-			} else if (!center->FindClients(head, pub, clients, reply)) {
+			if (head.route_size() == 1 && msg.ParseBody(pub)) {
+				// replyer(center->Publish(head, pub.topic(), msg)); // dead lock?
+				auto reply(center->Publish(head, pub.topic(), msg));
 				replyer(reply);
-			} else {
-				replyer(MakeReply(eSuccess));
-				if (clients.empty()) { return; }
-				for (auto &cli : clients) {
-					auto node = cli.weak_node_.lock();
-					if (node) {
-						// should also make sure that mq is not killed before msg expires.
-						// it would be ok if (kill_time - offline_time) is longer than expire time.
-						socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
-					}
+				auto hosts = center->FindRemoteSubClients(pub.topic());
+				for (auto &host : hosts) {
+					tcp_proxy.Publish(host, kBHCenterPort, pub.SerializeAsString());
 				}
 			}
 		};

--
Gitblit v1.8.0