From 9243710ca372de26823c2225c7b46b072458c671 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 28 五月 2021 17:18:33 +0800
Subject: [PATCH] tcp proxy requests, need more test.

---
 box/tcp_connection.cpp |   91 ++++++++++++++++++---------------------------
 1 files changed, 36 insertions(+), 55 deletions(-)

diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp
index 22162e5..02001bb 100644
--- a/box/tcp_connection.cpp
+++ b/box/tcp_connection.cpp
@@ -19,6 +19,7 @@
 #include "log.h"
 #include "msg.h"
 #include "node_center.h"
+#include "proto.h"
 #include "shm_socket.h"
 
 namespace
@@ -59,10 +60,8 @@
 bool CheckData(std::vector<char> &buffer, const uint32_t len, BHMsgHead &head, std::string &body_content)
 {
 	const char *p = buffer.data();
-	LOG_DEBUG() << "msg len " << len;
 	if (4 > len) { return false; }
 	uint32_t head_len = Get32(p);
-	LOG_DEBUG() << "head_len " << head_len;
 	if (head_len > 1024 * 4) {
 		throw std::runtime_error("unexpected tcp reply data.");
 	}
@@ -87,15 +86,34 @@
 
 /// request -----------------------------------------------------------
 
+void TcpRequest1::SendReply(BHMsgHead &head, std::string body_content)
+{
+	if (reply_cb_) {
+		reply_cb_(head, std::move(body_content));
+	}
+}
+
 void TcpRequest1::OnError(bserror_t ec)
 {
-	LOG_ERROR() << "tcp client error: " << ec;
+	// LOG_ERROR() << "tcp client error: " << ec << ", " << ec.message();
+	BHMsgHead head;
+	std::string body_content;
+	try {
+		std::vector<char> req(request_.begin(), request_.end());
+		if (CheckData(req, req.size(), head, body_content)) {
+			if (head.type() == kMsgTypeRequestTopic) {
+				SendReply(head, MakeReply<MsgRequestTopicReply>(eError, std::to_string(ec.value()) + ',' + ec.message()).SerializeAsString());
+			}
+		}
+	} catch (std::exception &e) {
+	}
 	Close();
 }
 
 void TcpRequest1::Start()
 {
 	auto readReply = [this]() {
+		// if (!reply_cb_) { return; } // no reply needed, maybe safe to close?
 		recv_buffer_.resize(1000);
 		recv_len_ = 0;
 		socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
@@ -104,47 +122,22 @@
 
 	socket_.async_connect(remote_, TcpCB(*this, request));
 }
-void TcpRequest1::Close()
-{
-	LOG_DEBUG() << "client close";
-	socket_.close();
-}
+void TcpRequest1::Close() { socket_.close(); }
 void TcpRequest1::OnRead(size_t size)
 {
-	LOG_DEBUG() << "reply data: " << recv_buffer_.data() + recv_len_;
-
 	recv_len_ += size;
 	BHMsgHead head;
 	std::string body_content;
-	bool recv_done = false;
 	try {
-		recv_done = CheckData(recv_buffer_, recv_len_, head, body_content);
+		if (CheckData(recv_buffer_, recv_len_, head, body_content)) { // got reply.
+			Close();
+			SendReply(head, std::move(body_content));
+		} else { // not complete, read again
+			socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
+		}
 	} catch (std::exception &e) {
 		LOG_ERROR() << e.what();
 		Close();
-		return;
-	}
-
-	if (recv_done) {
-		// just pass to client, no check, client will check it anyway.
-		LOG_DEBUG() << "route size: " << head.route_size();
-		if (head.route_size() < 1) { return; }
-		auto &back = head.route(head.route_size() - 1);
-		MQInfo dest = {back.mq_id(), back.abs_addr()};
-		head.mutable_route()->RemoveLast();
-
-		LOG_DEBUG() << "tcp got reply, pass to shm: " << dest.id_ << ", " << dest.offset_;
-		MsgRequestTopicReply reply;
-		if (reply.ParseFromString(body_content)) {
-			LOG_DEBUG() << "err msg: " << reply.errmsg().errstring();
-			LOG_DEBUG() << "content : " << reply.data();
-		}
-		Close();
-		return;
-		shm_socket_.Send(dest, std::string(recv_buffer_.data(), recv_buffer_.size()));
-	} else { // read again
-		LOG_DEBUG() << "not complete, read again " << recv_buffer_.size();
-		socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
 	}
 }
 
@@ -153,7 +146,7 @@
 void TcpReply1::OnError(bserror_t ec) { Close(); }
 void TcpReply1::Close()
 {
-	LOG_DEBUG() << "server close.";
+	LOG_TRACE() << "server close.";
 	socket_.close();
 }
 
@@ -177,38 +170,26 @@
 		return;
 	}
 
-	auto ParseBody = [&](auto &req) {
-		const char *p = recv_buffer_.data();
-		uint32_t size = Get32(p);
-		p += 4;
-		p += size;
-		size = Get32(p);
-		p += 4;
-		return req.ParseFromArray(p, size);
-	};
-
 	if (recv_done) {
-		LOG_DEBUG() << "request data: " << size;
-		auto self(shared_from_this());
+		LOG_TRACE() << "tcp server recv request data, size: " << size;
 		MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()};
 		if (remote.id_ && remote.offset_) {
+			auto self(shared_from_this());
 			auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
 				send_buffer_ = imsg.content();
 				async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
 			};
 			auto &scenter = *pscenter_;
-			if (!scenter->ProxyMsg(remote, head, body_content, onRecv)) {
-				send_buffer_ = "fake reply";
-				async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
+			if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) {
+				return;
 			}
 		} else {
 			LOG_DEBUG() << "no address";
-			send_buffer_ = "no address";
-			async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
 		}
+		Close();
 
-	} else { // read again
-		LOG_DEBUG() << "not complete, read again " << recv_buffer_.size();
+	} else { // not complete, read again
+		LOG_TRACE() << "not complete, read again " << recv_buffer_.size();
 		socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
 	}
 };
\ No newline at end of file

--
Gitblit v1.8.0