From ae17d1439b35b55212c3a30712e0a60b1d6a99c0 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 30 六月 2021 11:15:53 +0800
Subject: [PATCH] support tcp pub/sub.

---
 box/center.cpp |  107 +++++++++++++++++++++++++++++++----------------------
 1 files changed, 62 insertions(+), 45 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index 3f565b1..e0abbb3 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -17,7 +17,10 @@
  */
 #include "center.h"
 #include "center_topic_node.h"
+#include "io_service.h"
 #include "node_center.h"
+#include "tcp_proxy.h"
+#include "tcp_server.h"
 #include <chrono>
 
 using namespace std::chrono;
@@ -30,8 +33,6 @@
 namespace
 {
 
-//TODO check proc_id
-
 template <class Body, class OnMsg, class Replyer>
 inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer)
 {
@@ -40,18 +41,6 @@
 	if (msg.ParseBody(body)) {
 		replyer(onmsg(body));
 	}
-}
-
-Handler Combine(const Handler &h1, const Handler &h2)
-{
-	return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head) {
-		return h1(socket, msg, head) || h2(socket, msg, head);
-	};
-}
-template <class... H>
-Handler Combine(const Handler &h0, const Handler &h1, const Handler &h2, const H &...rest)
-{
-	return Combine(Combine(h0, h1), h2, rest...);
 }
 
 #define CASE_ON_MSG_TYPE(MsgTag)                                                         \
@@ -65,7 +54,7 @@
 	return [&](auto &&rep_body) {
 		auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id()));
 		MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()};
-		MsgI msg;
+		MsgI msg(socket.shm());
 		if (msg.Make(reply_head, rep_body)) {
 			DEFER1(msg.Release(););
 			center->SendAllocMsg(socket, remote, msg);
@@ -73,7 +62,7 @@
 	};
 }
 
-bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr)
+bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy)
 {
 	// command
 	auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool {
@@ -87,13 +76,49 @@
 		auto onInit = [&](const int64_t request) {
 			return center->OnNodeInit(socket, request);
 		};
-		BHCenterHandleInit(onInit);
+		BHCenterHandleInit(socket.shm(), onInit);
 		center->OnTimer();
 	};
 
-	auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
+	auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
 		auto &center = *center_ptr;
 		auto replyer = MakeReplyer(socket, head, center);
+
+		if (!head.dest().ip().empty()) { // other host, proxy
+			auto valid = [&]() { return head.route_size() == 1; };
+			if (!valid()) { return false; }
+
+			if (head.type() == kMsgTypeRequestTopic) {
+				typedef MsgRequestTopicReply Reply;
+				Reply reply;
+				if (!center->CheckMsg(head, reply)) {
+					replyer(reply);
+				} else {
+					auto onResult = [&center](BHMsgHead &head, std::string body_content) {
+						if (head.route_size() > 0) {
+							auto &back = head.route(head.route_size() - 1);
+							MQInfo dest = {back.mq_id(), back.abs_addr()};
+							head.mutable_route()->RemoveLast();
+							center->PassRemoteReplyToLocal(dest, head, std::move(body_content));
+						}
+					};
+					uint16_t port = head.dest().port();
+					if (port == 0) {
+						port = kBHCenterPort;
+					}
+					if (!tcp_proxy.Request(head.dest().ip(), port, msg.content(), onResult)) {
+						replyer(MakeReply<Reply>(eError, "send request failed."));
+					} else {
+						// success
+					}
+				}
+				return true;
+			} else {
+				// ignore other msgs for now.
+			}
+			return false;
+		}
+
 		switch (head.type()) {
 			CASE_ON_MSG_TYPE(ProcInit);
 			CASE_ON_MSG_TYPE(Register);
@@ -106,31 +131,22 @@
 		default: return false;
 		}
 	};
-	BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000);
+	BHCenter::Install("@center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000);
 
 	auto OnBusIdle = [=](ShmSocket &socket) {};
 	auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; };
-	auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
+	auto OnPubSub = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
 		auto &center = *center_ptr;
 		auto replyer = MakeReplyer(socket, head, center);
 		auto OnPublish = [&]() {
 			MsgPublish pub;
-			NodeCenter::Clients clients;
-			MsgCommonReply reply;
-			if (head.route_size() != 1 || !msg.ParseBody(pub)) {
-				return;
-			} else if (!center->FindClients(head, pub, clients, reply)) {
+			if (head.route_size() == 1 && msg.ParseBody(pub)) {
+				// replyer(center->Publish(head, pub.topic(), msg)); // dead lock?
+				auto reply(center->Publish(head, pub.topic(), msg));
 				replyer(reply);
-			} else {
-				replyer(MakeReply(eSuccess));
-				if (clients.empty()) { return; }
-				for (auto &cli : clients) {
-					auto node = cli.weak_node_.lock();
-					if (node) {
-						// should also make sure that mq is not killed before msg expires.
-						// it would be ok if (kill_time - offline_time) is longer than expire time.
-						socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
-					}
+				auto hosts = center->FindRemoteSubClients(pub.topic());
+				for (auto &host : hosts) {
+					tcp_proxy.Publish(host, kBHCenterPort, msg.content());
 				}
 			}
 		};
@@ -142,7 +158,7 @@
 		}
 	};
 
-	BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000);
+	BHCenter::Install("@center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000);
 
 	return true;
 }
@@ -165,16 +181,12 @@
 
 BHCenter::BHCenter(Socket::Shm &shm)
 {
-	auto gc = [&](const MQId id) {
-		auto r = ShmSocket::Remove(shm, id);
-		if (r) {
-			LOG_DEBUG() << "remove mq " << id << " ok\n";
-		}
-	};
-
 	auto nsec = NodeTimeoutSec();
-	auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
-	AddCenter(center_ptr);
+	auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
+	io_service_.reset(new IoService);
+	tcp_proxy_.reset(new TcpProxy(io_service_->io()));
+
+	AddCenter(center_ptr, shm, *tcp_proxy_);
 
 	for (auto &kv : Centers()) {
 		auto &info = kv.second;
@@ -182,7 +194,9 @@
 	}
 
 	topic_node_.reset(new CenterTopicNode(center_ptr, shm));
+	tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr));
 }
+
 BHCenter::~BHCenter() { Stop(); }
 
 bool BHCenter::Start()
@@ -197,6 +211,9 @@
 
 bool BHCenter::Stop()
 {
+	tcp_proxy_.reset();
+	tcp_server_.reset();
+	io_service_.reset();
 	topic_node_->Stop();
 	for (auto &kv : sockets_) {
 		kv.second->Stop();

--
Gitblit v1.8.0