From 026bbfaf2b5d73a26b8e2fa49158883ef64c211b Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 27 五月 2021 13:51:26 +0800
Subject: [PATCH] tcp server call center to send proxy requests.

---
 box/tcp_connection.cpp |  228 +++++++++++++++++++++++++++++++++++++++++++-------------
 1 files changed, 174 insertions(+), 54 deletions(-)

diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp
index 8968741..22162e5 100644
--- a/box/tcp_connection.cpp
+++ b/box/tcp_connection.cpp
@@ -17,78 +17,198 @@
  */
 #include "tcp_connection.h"
 #include "log.h"
+#include "msg.h"
+#include "node_center.h"
+#include "shm_socket.h"
 
 namespace
 {
 template <class C>
-auto Buffer(C &c) { return boost::asio::buffer(c.data(), c.size()); }
+auto Buffer(C &c, size_t offset = 0) { return boost::asio::buffer(c.data() + offset, c.size() - offset); }
 using boost::asio::async_read;
 using boost::asio::async_write;
 
+typedef std::function<void()> VoidHandler;
+typedef std::function<void(size_t)> SizeHandler;
+
+template <class T, class... Param>
+auto TcpCallback(T &conn, std::function<void(Param...)> const &func)
+{
+	auto self(conn.shared_from_this());
+	return [self, func](bserror_t ec, Param... size) {
+		if (!ec) {
+			func(size...);
+		} else {
+			self->OnError(ec);
+		}
+	};
+}
+
+template <class T>
+auto TcpCB(T &conn, VoidHandler const &func) { return TcpCallback(conn, func); }
+
+template <class T>
+auto TcpCBSize(T &conn, SizeHandler const &func) { return TcpCallback(conn, func); }
+
+template <class T>
+auto TcpCBSize(T &conn, VoidHandler const &func)
+{
+	return TcpCBSize(conn, [func](size_t) { func(); });
+}
+
+bool CheckData(std::vector<char> &buffer, const uint32_t len, BHMsgHead &head, std::string &body_content)
+{
+	const char *p = buffer.data();
+	LOG_DEBUG() << "msg len " << len;
+	if (4 > len) { return false; }
+	uint32_t head_len = Get32(p);
+	LOG_DEBUG() << "head_len " << head_len;
+	if (head_len > 1024 * 4) {
+		throw std::runtime_error("unexpected tcp reply data.");
+	}
+	auto before_body = 4 + head_len + 4;
+	if (before_body > len) {
+		if (before_body > buffer.size()) {
+			buffer.resize(before_body);
+		}
+		return false;
+	}
+	if (!head.ParseFromArray(p + 4, head_len)) {
+		throw std::runtime_error("tcp recv invalid reply head.");
+	}
+	uint32_t body_len = Get32(p + 4 + head_len);
+	buffer.resize(before_body + body_len);
+	if (buffer.size() > len) { return false; }
+	body_content.assign(p + before_body, body_len);
+	return true;
+}
+
 } // namespace
 
-TcpRequest1::TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request) :
-    socket_(io), remote_(addr), request_(std::move(request)) {}
-void TcpRequest1::Connect()
+/// request -----------------------------------------------------------
+
+void TcpRequest1::OnError(bserror_t ec)
 {
-	auto self = shared_from_this();
-	socket_.async_connect(remote_, [this, self](bserror_t ec) {
-		if (!ec) {
-			SendRequest();
-		} else {
-			LOG_ERROR() << "connect error " << ec;
-			Close();
-		}
-	});
+	LOG_ERROR() << "tcp client error: " << ec;
+	Close();
+}
+
+void TcpRequest1::Start()
+{
+	auto readReply = [this]() {
+		recv_buffer_.resize(1000);
+		recv_len_ = 0;
+		socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
+	};
+	auto request = [this, readReply]() { async_write(socket_, Buffer(request_), TcpCBSize(*this, readReply)); };
+
+	socket_.async_connect(remote_, TcpCB(*this, request));
 }
 void TcpRequest1::Close()
 {
+	LOG_DEBUG() << "client close";
+	socket_.close();
+}
+void TcpRequest1::OnRead(size_t size)
+{
+	LOG_DEBUG() << "reply data: " << recv_buffer_.data() + recv_len_;
+
+	recv_len_ += size;
+	BHMsgHead head;
+	std::string body_content;
+	bool recv_done = false;
+	try {
+		recv_done = CheckData(recv_buffer_, recv_len_, head, body_content);
+	} catch (std::exception &e) {
+		LOG_ERROR() << e.what();
+		Close();
+		return;
+	}
+
+	if (recv_done) {
+		// just pass to client, no check, client will check it anyway.
+		LOG_DEBUG() << "route size: " << head.route_size();
+		if (head.route_size() < 1) { return; }
+		auto &back = head.route(head.route_size() - 1);
+		MQInfo dest = {back.mq_id(), back.abs_addr()};
+		head.mutable_route()->RemoveLast();
+
+		LOG_DEBUG() << "tcp got reply, pass to shm: " << dest.id_ << ", " << dest.offset_;
+		MsgRequestTopicReply reply;
+		if (reply.ParseFromString(body_content)) {
+			LOG_DEBUG() << "err msg: " << reply.errmsg().errstring();
+			LOG_DEBUG() << "content : " << reply.data();
+		}
+		Close();
+		return;
+		shm_socket_.Send(dest, std::string(recv_buffer_.data(), recv_buffer_.size()));
+	} else { // read again
+		LOG_DEBUG() << "not complete, read again " << recv_buffer_.size();
+		socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
+	}
+}
+
+/// reply --------------------------------------------------------------
+
+void TcpReply1::OnError(bserror_t ec) { Close(); }
+void TcpReply1::Close()
+{
+	LOG_DEBUG() << "server close.";
 	socket_.close();
 }
 
-void TcpRequest1::SendRequest()
-{
-	LOG_INFO() << "client sending request " << request_;
-	auto self = shared_from_this();
-	async_write(socket_, Buffer(request_), [this, self](bserror_t ec, size_t) {
-		if (!ec) {
-			ReadReply();
-		} else {
-			Close();
-		}
-	});
-}
-void TcpRequest1::ReadReply()
-{
-	buffer_.resize(1000);
-	auto self = shared_from_this();
-	socket_.async_read_some(Buffer(buffer_), [this, self](bserror_t ec, size_t size) {
-		if (!ec) {
-			printf("reply data: %s\n", buffer_.data());
-		} else {
-			Close();
-		}
-	});
-}
-
-TcpReply1::TcpReply1(tcp::socket sock) :
-    socket_(std::move(sock)) {}
-
 void TcpReply1::Start()
 {
-	LOG_INFO() << "server session reading...";
 	recv_buffer_.resize(1000);
-	auto self(shared_from_this());
-	socket_.async_read_some(Buffer(recv_buffer_), [this, self](bserror_t ec, size_t size) {
-		LOG_INFO() << "server read : " << recv_buffer_.data();
-		// fake reply
-		if (!ec) {
-			send_buffer_ = std::string(recv_buffer_.data(), size) + " reply";
-			async_write(socket_, Buffer(send_buffer_), [this, self](bserror_t ec, size_t size) {
-				socket_.close();
-			});
+	socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
+}
+
+void TcpReply1::OnRead(size_t size)
+{
+	recv_len_ += size;
+	BHMsgHead head;
+	std::string body_content;
+	bool recv_done = false;
+	try {
+		recv_done = CheckData(recv_buffer_, recv_len_, head, body_content);
+	} catch (std::exception &e) {
+		LOG_ERROR() << e.what();
+		Close();
+		return;
+	}
+
+	auto ParseBody = [&](auto &req) {
+		const char *p = recv_buffer_.data();
+		uint32_t size = Get32(p);
+		p += 4;
+		p += size;
+		size = Get32(p);
+		p += 4;
+		return req.ParseFromArray(p, size);
+	};
+
+	if (recv_done) {
+		LOG_DEBUG() << "request data: " << size;
+		auto self(shared_from_this());
+		MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()};
+		if (remote.id_ && remote.offset_) {
+			auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
+				send_buffer_ = imsg.content();
+				async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
+			};
+			auto &scenter = *pscenter_;
+			if (!scenter->ProxyMsg(remote, head, body_content, onRecv)) {
+				send_buffer_ = "fake reply";
+				async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
+			}
 		} else {
-			socket_.close();
+			LOG_DEBUG() << "no address";
+			send_buffer_ = "no address";
+			async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
 		}
-	});
-}
\ No newline at end of file
+
+	} else { // read again
+		LOG_DEBUG() << "not complete, read again " << recv_buffer_.size();
+		socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
+	}
+};
\ No newline at end of file

--
Gitblit v1.8.0