From 9243710ca372de26823c2225c7b46b072458c671 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 28 五月 2021 17:18:33 +0800
Subject: [PATCH] tcp proxy requests, need more test.

---
 src/topic_node.cpp |   54 ++++++++++++++++++++++++++++++++++--------------------
 1 files changed, 34 insertions(+), 20 deletions(-)

diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 3c38121..b21f7ef 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -497,9 +497,10 @@
 
 	out_msg_id = msg_id;
 
-	auto SendTo = [this, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) {
+	auto SendTo = [this, remote_addr, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) {
 		auto &sock = SockClient();
 		BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id));
+		*head.mutable_dest() = remote_addr;
 		AddRoute(head, sock);
 		head.set_topic(req.topic());
 
@@ -519,8 +520,12 @@
 	};
 
 	try {
-		BHAddress addr;
-		return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb);
+		if (remote_addr.ip().empty()) {
+			BHAddress addr;
+			return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb);
+		} else {
+			return SendTo(CenterAddr(), req, cb);
+		}
 	} catch (...) {
 		SetLastError(eError, "internal error.");
 		return false;
@@ -536,25 +541,34 @@
 
 	try {
 		auto &sock = SockClient();
-
-		BHAddress addr;
-		if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
-			LOG_TRACE() << "node: " << SockNode().id() << ", topic dest: " << addr.mq_id();
-			BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
-			AddRoute(head, sock);
-			head.set_topic(request.topic());
-
-			MsgI reply_msg(shm());
-			DEFER1(reply_msg.Release(););
-			BHMsgHead reply_head;
-
-			if (sock.SendAndRecv({addr.mq_id(), addr.abs_addr()}, head, request, reply_msg, reply_head, timeout_ms) &&
-			    reply_head.type() == kMsgTypeRequestTopicReply &&
-			    reply_msg.ParseBody(out_reply)) {
-				reply_head.mutable_proc_id()->swap(out_proc_id);
-				return true;
+		MQInfo dest;
+		if (!remote_addr.ip().empty()) {
+			dest = CenterAddr();
+		} else {
+			BHAddress addr;
+			if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
+				dest.offset_ = addr.abs_addr();
+				dest.id_ = addr.mq_id();
+			} else {
+				return false;
 			}
 		}
+
+		BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
+		*head.mutable_dest() = remote_addr;
+		AddRoute(head, sock);
+		head.set_topic(request.topic());
+
+		MsgI reply_msg(shm());
+		DEFER1(reply_msg.Release(););
+		BHMsgHead reply_head;
+
+		if (sock.SendAndRecv(dest, head, request, reply_msg, reply_head, timeout_ms) &&
+		    reply_head.type() == kMsgTypeRequestTopicReply &&
+		    reply_msg.ParseBody(out_reply)) {
+			reply_head.mutable_proc_id()->swap(out_proc_id);
+			return true;
+		}
 	} catch (...) {
 		SetLastError(eError, __func__ + std::string(" internal errer."));
 	}

--
Gitblit v1.8.0