From 9243710ca372de26823c2225c7b46b072458c671 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 28 五月 2021 17:18:33 +0800
Subject: [PATCH] tcp proxy requests, need more test.

---
 box/center.cpp |   50 ++++++++++++++++++++++++++++++++++++++++++++------
 1 files changed, 44 insertions(+), 6 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index 8d24315..0fdfa33 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -17,7 +17,9 @@
  */
 #include "center.h"
 #include "center_topic_node.h"
+#include "io_service.h"
 #include "node_center.h"
+#include "tcp_proxy.h"
 #include "tcp_server.h"
 #include <chrono>
 
@@ -74,7 +76,7 @@
 	};
 }
 
-bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm)
+bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy)
 {
 	// command
 	auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool {
@@ -92,9 +94,41 @@
 		center->OnTimer();
 	};
 
-	auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
+	auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
 		auto &center = *center_ptr;
 		auto replyer = MakeReplyer(socket, head, center);
+
+		if (!head.dest().ip().empty()) { // other host, proxy
+			auto valid = [&]() { return head.route_size() == 1; };
+			if (!valid()) { return false; }
+
+			if (head.type() == kMsgTypeRequestTopic) {
+				typedef MsgRequestTopicReply Reply;
+				Reply reply;
+				if (!center->CheckMsg(head, reply)) {
+					replyer(reply);
+				} else {
+					auto onResult = [&center](BHMsgHead &head, std::string body_content) {
+						if (head.route_size() > 0) {
+							auto &back = head.route(head.route_size() - 1);
+							MQInfo dest = {back.mq_id(), back.abs_addr()};
+							head.mutable_route()->RemoveLast();
+							center->PassRemoteReplyToLocal(dest, head, std::move(body_content));
+						}
+					};
+					if (!tcp_proxy.Request(head.dest().ip(), head.dest().port(), msg.content(), onResult)) {
+						replyer(MakeReply<Reply>(eError, "send request failed."));
+					} else {
+						// success
+					}
+				}
+				return true;
+			} else {
+				// ignore other msgs for now.
+			}
+			return false;
+		}
+
 		switch (head.type()) {
 			CASE_ON_MSG_TYPE(ProcInit);
 			CASE_ON_MSG_TYPE(Register);
@@ -168,7 +202,10 @@
 {
 	auto nsec = NodeTimeoutSec();
 	auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
-	AddCenter(center_ptr, shm);
+	io_service_.reset(new IoService);
+	tcp_proxy_.reset(new TcpProxy(io_service_->io()));
+
+	AddCenter(center_ptr, shm, *tcp_proxy_);
 
 	for (auto &kv : Centers()) {
 		auto &info = kv.second;
@@ -176,7 +213,7 @@
 	}
 
 	topic_node_.reset(new CenterTopicNode(center_ptr, shm));
-	tcp_server_.reset(new TcpServer(kBHCenterPort, center_ptr));
+	tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr));
 }
 
 BHCenter::~BHCenter() { Stop(); }
@@ -188,13 +225,14 @@
 		sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_);
 	}
 	topic_node_->Start();
-	tcp_server_->Start();
 	return true;
 }
 
 bool BHCenter::Stop()
 {
-	tcp_server_->Stop();
+	tcp_proxy_.reset();
+	tcp_server_.reset();
+	io_service_.reset();
 	topic_node_->Stop();
 	for (auto &kv : sockets_) {
 		kv.second->Stop();

--
Gitblit v1.8.0