From 903b27f875e5f2a872c1b309f354b18c0450f35a Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 04 六月 2021 11:46:18 +0800
Subject: [PATCH] allow tcp request with no dest, auto query topic.

---
 box/tcp_connection.cpp |   22 ++++------
 utest/tcp_test.cpp     |    7 ++-
 box/node_center.h      |    2 
 box/node_center.cpp    |   33 ++++++++++++++--
 4 files changed, 43 insertions(+), 21 deletions(-)

diff --git a/box/node_center.cpp b/box/node_center.cpp
index b285c9f..5c24409 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -222,12 +222,37 @@
 	return node;
 }
 
-bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
+bool NodeCenter::PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
 {
-	Node node(GetNode(dest.id_));
-	if (!node || !Valid(*node)) {
-		LOG_ERROR() << id() << " pass remote request, dest not found.";
+	Node node;
+
+	auto FindDest = [&]() {
+		auto pos = service_map_.find(head.topic());
+		if (pos != service_map_.end() && !pos->second.empty()) {
+			auto &clients = pos->second;
+			for (auto &cli : clients) {
+				node = cli.weak_node_.lock();
+				if (node && Valid(*node)) {
+					dest.id_ = cli.mq_id_;
+					dest.offset_ = cli.mq_abs_addr_;
+					return true;
+				}
+			}
+		}
 		return false;
+	};
+
+	if (dest.id_ == 0) {
+		if (!FindDest()) {
+			LOG_ERROR() << id() << " pass remote request, topic dest not found.";
+			return false;
+		}
+	} else {
+		node = GetNode(dest.id_);
+		if (!node || !Valid(*node)) {
+			LOG_ERROR() << id() << " pass remote request, dest not found.";
+			return false;
+		}
 	}
 
 	ShmSocket &sender(DefaultSender(node->shm_));
diff --git a/box/node_center.h b/box/node_center.h
index 461a354..74dd52f 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -121,7 +121,7 @@
 	void RecordMsg(const MsgI &msg);
 	bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg);
 	bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg);
-	bool PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
+	bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
 	bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content);
 	void OnAlloc(ShmSocket &socket, const int64_t val);
 	void OnFree(ShmSocket &socket, const int64_t val);
diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp
index 02001bb..85ed4ed 100644
--- a/box/tcp_connection.cpp
+++ b/box/tcp_connection.cpp
@@ -173,21 +173,17 @@
 	if (recv_done) {
 		LOG_TRACE() << "tcp server recv request data, size: " << size;
 		MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()};
-		if (remote.id_ && remote.offset_) {
-			auto self(shared_from_this());
-			auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
-				send_buffer_ = imsg.content();
-				async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
-			};
-			auto &scenter = *pscenter_;
-			if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) {
-				return;
-			}
+		auto self(shared_from_this());
+		auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
+			send_buffer_ = imsg.content();
+			async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
+		};
+		auto &scenter = *pscenter_;
+		if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) {
+			return;
 		} else {
-			LOG_DEBUG() << "no address";
+			Close();
 		}
-		Close();
-
 	} else { // not complete, read again
 		LOG_TRACE() << "not complete, read again " << recv_buffer_.size();
 		socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
diff --git a/utest/tcp_test.cpp b/utest/tcp_test.cpp
index 2aead7e..0ead665 100644
--- a/utest/tcp_test.cpp
+++ b/utest/tcp_test.cpp
@@ -49,11 +49,12 @@
 		auto route = head.add_route();
 		route->set_mq_id(12345);
 		route->set_abs_addr(67890);
+		head.set_topic(req.topic());
 
 		head.mutable_dest()->set_ip(connect_addr);
-		head.mutable_dest()->set_port(port);
-		head.mutable_dest()->set_mq_id(201);
-		head.mutable_dest()->set_abs_addr(10072);
+		// head.mutable_dest()->set_port(port);
+		// head.mutable_dest()->set_mq_id(201);
+		// head.mutable_dest()->set_abs_addr(10072);
 
 		return (MsgI::Serialize(head, req));
 	};

--
Gitblit v1.8.0