From 026bbfaf2b5d73a26b8e2fa49158883ef64c211b Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 27 五月 2021 13:51:26 +0800
Subject: [PATCH] tcp server call center to send proxy requests.

---
 box/tcp_connection.cpp |  228 +++++++++++++++----
 box/center.cpp         |    4 
 src/msg.h              |   47 +++
 box/io_service.h       |   42 +++
 box/tcp_proxy.cpp      |   20 -
 box/center.h           |    2 
 box/node_center.cpp    |   23 ++
 box/io_service.cpp     |   48 ++++
 src/shm_socket.cpp     |   58 +++-
 src/shm_socket.h       |   20 -
 box/tcp_proxy.h        |    2 
 box/tcp_server.h       |   16 
 utest/tcp_test.cpp     |   29 ++
 box/node_center.h      |    1 
 box/tcp_server.cpp     |   32 --
 box/tcp_connection.h   |   37 ++-
 16 files changed, 449 insertions(+), 160 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index c3a03e3..8d24315 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -18,6 +18,7 @@
 #include "center.h"
 #include "center_topic_node.h"
 #include "node_center.h"
+#include "tcp_server.h"
 #include <chrono>
 
 using namespace std::chrono;
@@ -175,6 +176,7 @@
 	}
 
 	topic_node_.reset(new CenterTopicNode(center_ptr, shm));
+	tcp_server_.reset(new TcpServer(kBHCenterPort, center_ptr));
 }
 
 BHCenter::~BHCenter() { Stop(); }
@@ -186,11 +188,13 @@
 		sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_);
 	}
 	topic_node_->Start();
+	tcp_server_->Start();
 	return true;
 }
 
 bool BHCenter::Stop()
 {
+	tcp_server_->Stop();
 	topic_node_->Stop();
 	for (auto &kv : sockets_) {
 		kv.second->Stop();
diff --git a/box/center.h b/box/center.h
index 6610277..c4aa1ac 100644
--- a/box/center.h
+++ b/box/center.h
@@ -23,6 +23,7 @@
 #include <map>
 #include <memory>
 class CenterTopicNode;
+class TcpServer;
 
 class BHCenter
 {
@@ -53,6 +54,7 @@
 
 	std::map<std::string, std::shared_ptr<ShmSocket>> sockets_;
 	std::unique_ptr<CenterTopicNode> topic_node_;
+	std::unique_ptr<TcpServer> tcp_server_;
 };
 
 #endif // end of include guard: CENTER_TM9OUQTG
diff --git a/box/io_service.cpp b/box/io_service.cpp
new file mode 100644
index 0000000..1d531e0
--- /dev/null
+++ b/box/io_service.cpp
@@ -0,0 +1,48 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  io_service.cpp
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�05鏈�27鏃� 13鏃�25鍒�18绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#include "io_service.h"
+#include <chrono>
+using namespace std::chrono_literals;
+
+bool IoService::Start()
+{
+	Stop();
+	bool cur = false;
+	if (!run_.compare_exchange_strong(cur, true)) {
+		return false;
+	}
+
+	auto proc = [this]() {
+		while (run_) {
+			io_.run_one_for(100ms);
+		}
+		OnStop();
+	};
+	std::thread(proc).swap(worker_);
+	return true;
+}
+
+void IoService::Stop()
+{
+	bool cur = true;
+	if (run_.compare_exchange_strong(cur, false)) {
+		if (worker_.joinable()) {
+			worker_.join();
+		}
+	}
+}
diff --git a/box/io_service.h b/box/io_service.h
new file mode 100644
index 0000000..000facc
--- /dev/null
+++ b/box/io_service.h
@@ -0,0 +1,42 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  io_service.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�05鏈�27鏃� 13鏃�25鍒�37绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#ifndef IO_SERVICE_ODKKJG3D
+#define IO_SERVICE_ODKKJG3D
+
+#include <boost/asio.hpp>
+#include <thread>
+
+class IoService
+{
+public:
+	IoService() :
+	    run_(false) {}
+	bool Start();
+	void Stop();
+
+	typedef boost::asio::io_context io_service_t;
+	io_service_t &io() { return io_; }
+
+private:
+	virtual void OnStop() {}
+	io_service_t io_;
+	std::thread worker_;
+	std::atomic<bool> run_;
+};
+
+#endif // end of include guard: IO_SERVICE_ODKKJG3D
diff --git a/box/node_center.cpp b/box/node_center.cpp
index cbaef0e..ff199b2 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -209,6 +209,29 @@
 	RecordMsg(msg);
 	return socket.Send(dest, msg);
 }
+bool NodeCenter::ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
+{
+	auto ssn = dest.id_ - (dest.id_ % 10);
+	LOG_DEBUG() << "prox ssn " << ssn;
+	auto pos = nodes_.find(ssn);
+	if (pos == nodes_.end()) {
+		LOG_ERROR() << "proxy msg, ssn not found.";
+		return false;
+	}
+	auto &node = pos->second;
+	if (!Valid(*node)) { return false; }
+
+	ShmSocket &sender(DefaultSender(node->shm_));
+	auto route = head.add_route();
+	route->set_mq_id(sender.id());
+	route->set_abs_addr(sender.AbsAddr());
+
+	ShmMsg msg(node->shm_);
+	if (!msg.Make(head, body_content)) { return false; }
+	DEFER1(msg.Release(););
+	RecordMsg(msg);
+	return sender.Send(dest, msg, head.msg_id(), std::move(cb));
+}
 
 void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val)
 {
diff --git a/box/node_center.h b/box/node_center.h
index caaf054..1c79809 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -122,6 +122,7 @@
 	void RecordMsg(const MsgI &msg);
 	bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg);
 	bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg);
+	bool ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
 	void OnAlloc(ShmSocket &socket, const int64_t val);
 	void OnFree(ShmSocket &socket, const int64_t val);
 	bool OnCommand(ShmSocket &socket, const int64_t val);
diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp
index 8968741..22162e5 100644
--- a/box/tcp_connection.cpp
+++ b/box/tcp_connection.cpp
@@ -17,78 +17,198 @@
  */
 #include "tcp_connection.h"
 #include "log.h"
+#include "msg.h"
+#include "node_center.h"
+#include "shm_socket.h"
 
 namespace
 {
 template <class C>
-auto Buffer(C &c) { return boost::asio::buffer(c.data(), c.size()); }
+auto Buffer(C &c, size_t offset = 0) { return boost::asio::buffer(c.data() + offset, c.size() - offset); }
 using boost::asio::async_read;
 using boost::asio::async_write;
 
+typedef std::function<void()> VoidHandler;
+typedef std::function<void(size_t)> SizeHandler;
+
+template <class T, class... Param>
+auto TcpCallback(T &conn, std::function<void(Param...)> const &func)
+{
+	auto self(conn.shared_from_this());
+	return [self, func](bserror_t ec, Param... size) {
+		if (!ec) {
+			func(size...);
+		} else {
+			self->OnError(ec);
+		}
+	};
+}
+
+template <class T>
+auto TcpCB(T &conn, VoidHandler const &func) { return TcpCallback(conn, func); }
+
+template <class T>
+auto TcpCBSize(T &conn, SizeHandler const &func) { return TcpCallback(conn, func); }
+
+template <class T>
+auto TcpCBSize(T &conn, VoidHandler const &func)
+{
+	return TcpCBSize(conn, [func](size_t) { func(); });
+}
+
+bool CheckData(std::vector<char> &buffer, const uint32_t len, BHMsgHead &head, std::string &body_content)
+{
+	const char *p = buffer.data();
+	LOG_DEBUG() << "msg len " << len;
+	if (4 > len) { return false; }
+	uint32_t head_len = Get32(p);
+	LOG_DEBUG() << "head_len " << head_len;
+	if (head_len > 1024 * 4) {
+		throw std::runtime_error("unexpected tcp reply data.");
+	}
+	auto before_body = 4 + head_len + 4;
+	if (before_body > len) {
+		if (before_body > buffer.size()) {
+			buffer.resize(before_body);
+		}
+		return false;
+	}
+	if (!head.ParseFromArray(p + 4, head_len)) {
+		throw std::runtime_error("tcp recv invalid reply head.");
+	}
+	uint32_t body_len = Get32(p + 4 + head_len);
+	buffer.resize(before_body + body_len);
+	if (buffer.size() > len) { return false; }
+	body_content.assign(p + before_body, body_len);
+	return true;
+}
+
 } // namespace
 
-TcpRequest1::TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request) :
-    socket_(io), remote_(addr), request_(std::move(request)) {}
-void TcpRequest1::Connect()
+/// request -----------------------------------------------------------
+
+void TcpRequest1::OnError(bserror_t ec)
 {
-	auto self = shared_from_this();
-	socket_.async_connect(remote_, [this, self](bserror_t ec) {
-		if (!ec) {
-			SendRequest();
-		} else {
-			LOG_ERROR() << "connect error " << ec;
-			Close();
-		}
-	});
+	LOG_ERROR() << "tcp client error: " << ec;
+	Close();
+}
+
+void TcpRequest1::Start()
+{
+	auto readReply = [this]() {
+		recv_buffer_.resize(1000);
+		recv_len_ = 0;
+		socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
+	};
+	auto request = [this, readReply]() { async_write(socket_, Buffer(request_), TcpCBSize(*this, readReply)); };
+
+	socket_.async_connect(remote_, TcpCB(*this, request));
 }
 void TcpRequest1::Close()
 {
+	LOG_DEBUG() << "client close";
+	socket_.close();
+}
+void TcpRequest1::OnRead(size_t size)
+{
+	LOG_DEBUG() << "reply data: " << recv_buffer_.data() + recv_len_;
+
+	recv_len_ += size;
+	BHMsgHead head;
+	std::string body_content;
+	bool recv_done = false;
+	try {
+		recv_done = CheckData(recv_buffer_, recv_len_, head, body_content);
+	} catch (std::exception &e) {
+		LOG_ERROR() << e.what();
+		Close();
+		return;
+	}
+
+	if (recv_done) {
+		// just pass to client, no check, client will check it anyway.
+		LOG_DEBUG() << "route size: " << head.route_size();
+		if (head.route_size() < 1) { return; }
+		auto &back = head.route(head.route_size() - 1);
+		MQInfo dest = {back.mq_id(), back.abs_addr()};
+		head.mutable_route()->RemoveLast();
+
+		LOG_DEBUG() << "tcp got reply, pass to shm: " << dest.id_ << ", " << dest.offset_;
+		MsgRequestTopicReply reply;
+		if (reply.ParseFromString(body_content)) {
+			LOG_DEBUG() << "err msg: " << reply.errmsg().errstring();
+			LOG_DEBUG() << "content : " << reply.data();
+		}
+		Close();
+		return;
+		shm_socket_.Send(dest, std::string(recv_buffer_.data(), recv_buffer_.size()));
+	} else { // read again
+		LOG_DEBUG() << "not complete, read again " << recv_buffer_.size();
+		socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
+	}
+}
+
+/// reply --------------------------------------------------------------
+
+void TcpReply1::OnError(bserror_t ec) { Close(); }
+void TcpReply1::Close()
+{
+	LOG_DEBUG() << "server close.";
 	socket_.close();
 }
 
-void TcpRequest1::SendRequest()
-{
-	LOG_INFO() << "client sending request " << request_;
-	auto self = shared_from_this();
-	async_write(socket_, Buffer(request_), [this, self](bserror_t ec, size_t) {
-		if (!ec) {
-			ReadReply();
-		} else {
-			Close();
-		}
-	});
-}
-void TcpRequest1::ReadReply()
-{
-	buffer_.resize(1000);
-	auto self = shared_from_this();
-	socket_.async_read_some(Buffer(buffer_), [this, self](bserror_t ec, size_t size) {
-		if (!ec) {
-			printf("reply data: %s\n", buffer_.data());
-		} else {
-			Close();
-		}
-	});
-}
-
-TcpReply1::TcpReply1(tcp::socket sock) :
-    socket_(std::move(sock)) {}
-
 void TcpReply1::Start()
 {
-	LOG_INFO() << "server session reading...";
 	recv_buffer_.resize(1000);
-	auto self(shared_from_this());
-	socket_.async_read_some(Buffer(recv_buffer_), [this, self](bserror_t ec, size_t size) {
-		LOG_INFO() << "server read : " << recv_buffer_.data();
-		// fake reply
-		if (!ec) {
-			send_buffer_ = std::string(recv_buffer_.data(), size) + " reply";
-			async_write(socket_, Buffer(send_buffer_), [this, self](bserror_t ec, size_t size) {
-				socket_.close();
-			});
+	socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
+}
+
+void TcpReply1::OnRead(size_t size)
+{
+	recv_len_ += size;
+	BHMsgHead head;
+	std::string body_content;
+	bool recv_done = false;
+	try {
+		recv_done = CheckData(recv_buffer_, recv_len_, head, body_content);
+	} catch (std::exception &e) {
+		LOG_ERROR() << e.what();
+		Close();
+		return;
+	}
+
+	auto ParseBody = [&](auto &req) {
+		const char *p = recv_buffer_.data();
+		uint32_t size = Get32(p);
+		p += 4;
+		p += size;
+		size = Get32(p);
+		p += 4;
+		return req.ParseFromArray(p, size);
+	};
+
+	if (recv_done) {
+		LOG_DEBUG() << "request data: " << size;
+		auto self(shared_from_this());
+		MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()};
+		if (remote.id_ && remote.offset_) {
+			auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
+				send_buffer_ = imsg.content();
+				async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
+			};
+			auto &scenter = *pscenter_;
+			if (!scenter->ProxyMsg(remote, head, body_content, onRecv)) {
+				send_buffer_ = "fake reply";
+				async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
+			}
 		} else {
-			socket_.close();
+			LOG_DEBUG() << "no address";
+			send_buffer_ = "no address";
+			async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
 		}
-	});
-}
\ No newline at end of file
+
+	} else { // read again
+		LOG_DEBUG() << "not complete, read again " << recv_buffer_.size();
+		socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
+	}
+};
\ No newline at end of file
diff --git a/box/tcp_connection.h b/box/tcp_connection.h
index b3b8344..5aa93a4 100644
--- a/box/tcp_connection.h
+++ b/box/tcp_connection.h
@@ -18,46 +18,61 @@
 #ifndef TCP_CONNECTION_H373GIL5
 #define TCP_CONNECTION_H373GIL5
 
+#include "bh_util.h"
+#include "node_center.h"
 #include "tcp_common.h"
 #include <functional>
 #include <memory>
 
+class ShmSocket;
 class TcpRequest1 : public std::enable_shared_from_this<TcpRequest1>
 {
 public:
-	static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request)
+	static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ShmSocket &shm_socket)
 	{
-		std::make_shared<TcpRequest1>(io, addr, std::move(request))->Connect();
+		std::make_shared<TcpRequest1>(io, addr, std::move(request), shm_socket)->Start();
 	}
 
-	TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request);
+	TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ShmSocket &shm_socket) :
+	    socket_(io), shm_socket_(shm_socket), remote_(addr), request_(std::move(request)) {}
+	void OnError(bserror_t ec);
 
 private:
-	void Connect();
+	void Start();
 	void Close();
-	void SendRequest();
-	void ReadReply();
+	void OnRead(size_t size);
 
 	tcp::socket socket_;
+	ShmSocket &shm_socket_; // send reply
 	tcp::endpoint remote_;
 	std::string request_;
-	std::vector<char> buffer_;
+	std::vector<char> recv_buffer_;
+	size_t recv_len_ = 0;
 };
 
+class NodeCenter;
 class TcpReply1 : public std::enable_shared_from_this<TcpReply1>
 {
 public:
-	static void Create(tcp::socket sock)
+	typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
+	static void Create(tcp::socket sock, CenterPtr pscenter)
 	{
-		std::make_shared<TcpReply1>(std::move(sock))->Start();
+		std::make_shared<TcpReply1>(std::move(sock), pscenter)->Start();
 	}
 
-	TcpReply1(tcp::socket sock);
-	void Start();
+	TcpReply1(tcp::socket sock, CenterPtr pscenter) :
+	    socket_(std::move(sock)), pscenter_(pscenter) {}
+	void OnError(bserror_t ec);
 
 private:
+	void Start();
+	void Close();
+	void OnRead(size_t size);
+
 	tcp::socket socket_;
+	CenterPtr pscenter_;
 	std::vector<char> recv_buffer_;
+	uint32_t recv_len_ = 0;
 	std::string send_buffer_;
 };
 
diff --git a/box/tcp_proxy.cpp b/box/tcp_proxy.cpp
index 2e95a3b..298c6b6 100644
--- a/box/tcp_proxy.cpp
+++ b/box/tcp_proxy.cpp
@@ -36,8 +36,7 @@
 	auto localProc = [this](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
 		auto &dest = head.dest();
 		if (dest.ip().empty() || dest.port() == 0) { return; }
-		bool r = Send(dest.ip(), dest.port(), msg.content());
-		// TODO check send fail.
+		Request(dest.ip(), dest.port(), msg.content());
 	};
 	local_->Start(1, localProc);
 
@@ -46,34 +45,25 @@
 			io_context_.run_one_for(std::chrono::milliseconds(100));
 		}
 	};
+	std::thread(proxyProc).swap(worker_);
 	return true;
 }
 
 void TcpProxy::Stop()
 {
-	local_.reset();
-
 	bool cur = true;
 	if (run_.compare_exchange_strong(cur, false)) {
 		if (worker_.joinable()) {
 			worker_.join();
 		}
+		local_.reset();
 	}
 }
 
-bool TcpProxy::Send(const std::string &ip, int port, std::string &&content)
+bool TcpProxy::Request(const std::string &ip, int port, std::string &&content)
 {
 	if (content.empty()) { return false; }
 
 	tcp::endpoint dest(ip::address::from_string(ip), port);
-	TcpRequest1::Create(io_context_, dest, std::move(content));
-
-	// char tag[sizeof(kBHTcpServerTag)] = {0};
-
-	// int n = read(sock, tag, sizeof(tag));
-	// if (n == sizeof(tag) && memcmp(tag, &kBHTcpServerTag, sizeof(tag)) == 0) {
-	// 	send(sock, content.data(), content.size(), 0);
-	// 	connections_[addr].io_info_.h_ = [this, sock](int events) { OnReply(sock); };
-	// 	// success
-	// }
+	TcpRequest1::Create(io_context_, dest, std::move(content), *local_);
 }
diff --git a/box/tcp_proxy.h b/box/tcp_proxy.h
index 8f2af91..9c74532 100644
--- a/box/tcp_proxy.h
+++ b/box/tcp_proxy.h
@@ -34,7 +34,7 @@
 	void Stop();
 
 private:
-	bool Send(const std::string &ip, int port, std::string &&content);
+	bool Request(const std::string &ip, int port, std::string &&content);
 	std::unique_ptr<ShmSocket> local_;
 
 	boost::asio::io_context io_context_;
diff --git a/box/tcp_server.cpp b/box/tcp_server.cpp
index e4e229c..ea23106 100644
--- a/box/tcp_server.cpp
+++ b/box/tcp_server.cpp
@@ -23,39 +23,17 @@
 
 using namespace std::chrono_literals;
 
-TcpServer::TcpServer(int port) :
-    run_(false), listener_(io_, tcp::endpoint(tcp::v6(), port))
+TcpServer::TcpServer(int port, CenterPtr pscenter) :
+    listener_(io(), tcp::endpoint(tcp::v6(), port)), pscenter_(pscenter)
 {
 	Accept();
 }
 
 TcpServer::~TcpServer() { Stop(); }
 
-bool TcpServer::Start()
+void TcpServer::OnStop()
 {
-	Stop();
-	bool cur = false;
-	if (run_.compare_exchange_strong(cur, true)) {
-		auto proc = [this]() {
-			while (run_) {
-				io_.run_one_for(100ms);
-			}
-		};
-		std::thread(proc).swap(worker_);
-	}
-}
-void TcpServer::Stop()
-{
-	bool cur = true;
-	if (run_.compare_exchange_strong(cur, false)) {
-		io_.post([this]() {
-			listener_.close();
-		});
-		std::this_thread::sleep_for(1s);
-		if (worker_.joinable()) {
-			worker_.join();
-		}
-	}
+	listener_.close();
 }
 
 void TcpServer::Accept()
@@ -63,7 +41,7 @@
 	listener_.async_accept([this](bserror_t ec, tcp::socket sock) {
 		if (!ec) {
 			LOG_INFO() << "server accept client";
-			TcpReply1::Create(std::move(sock));
+			TcpReply1::Create(std::move(sock), pscenter_);
 		}
 		Accept();
 	});
diff --git a/box/tcp_server.h b/box/tcp_server.h
index c06cddc..2c9337c 100644
--- a/box/tcp_server.h
+++ b/box/tcp_server.h
@@ -18,23 +18,23 @@
 #ifndef TCP_SERVER_795VXR94
 #define TCP_SERVER_795VXR94
 
+#include "bh_util.h"
+#include "io_service.h"
 #include "tcp_common.h"
-#include <thread>
 
-class TcpServer
+class NodeCenter;
+class TcpServer : public IoService
 {
 public:
-	explicit TcpServer(int port);
+	typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
+	TcpServer(int port, CenterPtr pscenter);
 	~TcpServer();
-	bool Start();
-	void Stop();
 
 private:
+	virtual void OnStop();
 	void Accept();
-	std::thread worker_;
-	std::atomic<bool> run_;
-	boost::asio::io_context io_;
 	tcp::acceptor listener_;
+	CenterPtr pscenter_;
 };
 
 #endif // end of include guard: TCP_SERVER_795VXR94
diff --git a/src/msg.h b/src/msg.h
index 12922b5..f0ed840 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -75,12 +75,16 @@
 	};
 	OffsetType offset_;
 	SharedMemory *pshm_;
-	void *Alloc(const size_t size)
+
+	void *Alloc(const size_t size, const void *src = nullptr)
 	{
 		void *p = shm().Alloc(sizeof(Meta) + size);
 		if (p) {
 			auto pmeta = new (p) Meta(size);
 			p = pmeta + 1;
+			if (src) {
+				memcpy(p, src, size);
+			}
 		}
 		return p;
 	}
@@ -108,16 +112,35 @@
 		}
 		return addr;
 	}
-
-	void *Pack(const std::string &content)
+	void *Pack(const BHMsgHead &head, const uint32_t head_len, const std::string &body_content)
 	{
 		void *addr = get();
 		if (addr) {
-			memcpy(addr, content.data(), content.size());
-			meta()->size_ = content.size();
+			auto p = static_cast<char *>(addr);
+			auto Pack1 = [&p](uint32_t len, auto &&writer) {
+				Put32(p, len);
+				p += sizeof(len);
+				writer(p, len);
+				p += len;
+			};
+			Pack1(head_len, [&](void *p, int len) { head.SerializeToArray(p, len); });
+			Pack1(body_content.size(), [&](void *p, int len) { memcpy(p, body_content.data(), len); });
+			meta()->size_ = 4 + head_len + 4 + body_content.size();
 		}
 		return addr;
 	}
+
+	void *Pack(const void *src, const size_t size)
+	{
+		void *addr = get();
+		if (addr && src) {
+			memcpy(addr, src, size);
+			meta()->size_ = size;
+		}
+		return addr;
+	}
+
+	void *Pack(const std::string &content) { return Pack(content.data(), content.size()); }
 
 	bool Make(void *addr)
 	{
@@ -166,6 +189,13 @@
 		uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len;
 		return Make(size) && Pack(head, head_len, body, body_len);
 	}
+	inline bool Make(const BHMsgHead &head, const std::string &body_content)
+	{
+		uint32_t head_len = head.ByteSizeLong();
+		uint32_t size = sizeof(head_len) + head_len + sizeof(uint32_t) + body_content.size();
+		return Make(size) && Pack(head, head_len, body_content);
+	}
+
 	template <class Body>
 	inline bool Fill(const BHMsgHead &head, const Body &body)
 	{
@@ -175,8 +205,11 @@
 		return valid() && (meta()->capacity_ >= size) && Pack(head, head_len, body, body_len);
 	}
 
-	inline bool Make(const std::string &content) { return Make(content.size()) && Pack(content); }
-	inline bool Fill(const std::string &content) { return valid() && (meta()->capacity_ >= content.size()) && Pack(content); }
+	inline bool Make(const void *src, const size_t size) { return Make(Alloc(size, src)); }
+	inline bool Fill(const void *src, const size_t size) { return valid() && (meta()->capacity_ >= size) && Pack(src, size); }
+
+	inline bool Make(const std::string &content) { return Make(content.data(), content.size()); }
+	inline bool Fill(const std::string &content) { return Fill(content.data(), content.size()); }
 
 	inline bool Make(const size_t size) { return Make(Alloc(size)); }
 
diff --git a/src/shm_socket.cpp b/src/shm_socket.cpp
index 11824d7..d54168d 100644
--- a/src/shm_socket.cpp
+++ b/src/shm_socket.cpp
@@ -145,46 +145,66 @@
 	return false;
 }
 
-bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb)
+bool ShmSocket::Send(const MQInfo &remote, std::string &&content)
 {
 	size_t size = content.size();
-	auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
+	auto OnResult = [content = std::move(content), remote, this](MsgI &msg) mutable {
 		if (!msg.Fill(content)) { return false; }
 
 		try {
-			if (!cb) {
-				Send(remote, msg);
-			} else {
-				per_msg_cbs_->Store(msg_id, std::move(cb));
-				auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
-					RecvCB cb_no_use;
-					per_msg_cbs_->Pick(msg_id, cb_no_use);
-				};
-				Send(remote, msg, onExpireRemoveCB);
-			}
+			SendImpl(remote, msg);
 			return true;
 		} catch (...) {
 			SetLastError(eError, "Send internal error.");
 			return false;
 		}
 	};
+
+	return RequestAlloc(size, OnResult);
+}
+
+bool ShmSocket::Send(const MQInfo &remote, const MsgI &msg, const std::string &msg_id, RecvCB &&cb)
+{
+	try {
+		per_msg_cbs_->Store(msg_id, std::move(cb));
+		auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
+			RecvCB cb_no_use;
+			per_msg_cbs_->Pick(msg_id, cb_no_use);
+		};
+		SendImpl(remote, msg, onExpireRemoveCB);
+		return true;
+	} catch (std::exception &e) {
+		SetLastError(eError, "Send internal error.");
+		return false;
+	}
+}
+
+bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb)
+{
+	size_t size = content.size();
+	auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
+		return msg.Fill(content) && Send(remote, msg, msg_id, std::move(cb));
+	};
+
+	return RequestAlloc(size, OnResult);
+}
+
+bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult)
+{ // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag
 #if 0
 	// self alloc
 	MsgI msg(shm());
 	if (msg.Make(size)) {
 		DEFER1(msg.Release());
-		return OnResult(msg);
+		return onResult(msg);
+	} else {
+		return false;
 	}
-#else
-	// center alloc
-	return RequestAlloc(size, OnResult);
 #endif
-}
 
-bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult)
-{ // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag
 	// LOG_FUNCTION;
 	if (node_proc_index_ == -1 || socket_index_ == -1) {
+		LOG_ERROR() << "socket not inited.";
 		return false;
 	}
 	int id = (++alloc_id_) & MaskBits(28);
diff --git a/src/shm_socket.h b/src/shm_socket.h
index bf78e89..9dfdd6b 100644
--- a/src/shm_socket.h
+++ b/src/shm_socket.h
@@ -68,16 +68,16 @@
 
 	bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult);
 
+	bool Send(const MQInfo &remote, const MsgI &msg, const std::string &msg_id, RecvCB &&cb);
 	template <class Body>
-	bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
-	{
-		return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb));
-	}
-	template <class... T>
-	bool Send(const MQInfo &remote, const MsgI &imsg, T &&...t)
-	{
-		return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...);
-	}
+	bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb) { return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb)); }
+	bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb);
+
+	template <class Body>
+	bool Send(const MQInfo &remote, BHMsgHead &head, Body &body) { return Send(remote, MsgI::Serialize(head, body)); }
+	bool Send(const MQInfo &remote, std::string &&content);
+	bool Send(const MQInfo &remote, const MsgI &imsg) { return SendImpl(remote, imsg); }
+
 	template <class... T>
 	bool Send(const MQInfo &remote, const int64_t cmd, T &&...t)
 	{
@@ -135,8 +135,6 @@
 private:
 	bool StopNoLock();
 	bool RunningNoLock() { return !workers_.empty(); }
-
-	bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb = RecvCB());
 
 	template <class... Rest>
 	bool SendImpl(Rest &&...rest)
diff --git a/utest/tcp_test.cpp b/utest/tcp_test.cpp
index ff31c3e..a838252 100644
--- a/utest/tcp_test.cpp
+++ b/utest/tcp_test.cpp
@@ -16,6 +16,8 @@
  * =====================================================================================
  */
 
+#include "defs.h"
+#include "node_center.h"
 #include "tcp_connection.h"
 #include "tcp_server.h"
 #include "util.h"
@@ -31,18 +33,31 @@
 
 BOOST_AUTO_TEST_CASE(TcpTest)
 {
-	const std::string bind_addr = "127.0.0.1";
-	const std::string connect_addr = "127.0.0.1";
-	const uint16_t port = 10000;
+	SharedMemory &shm = TestShm();
 
-	TcpServer server(port);
-	server.Start();
+	const std::string connect_addr = "127.0.0.1";
+	const uint16_t port = kBHCenterPort;
 
 	boost::asio::io_context io;
 
 	tcp::endpoint dest(ip::address::from_string(connect_addr), port);
-	for (int i = 0; i < 10; ++i) {
-		TcpRequest1::Create(io, dest, "client->server " + std::to_string(i));
+	MsgRequestTopic req;
+	req.set_topic("#center_query_procs");
+	req.set_data("");
+	auto head = InitMsgHead(GetType(req), "#test_proc", 1000000);
+	auto route = head.add_route();
+	route->set_mq_id(12345);
+	route->set_abs_addr(67890);
+
+	head.mutable_dest()->set_ip(connect_addr);
+	head.mutable_dest()->set_port(port);
+	head.mutable_dest()->set_mq_id(1000011);
+	head.mutable_dest()->set_abs_addr(10296);
+
+	auto request(MsgI::Serialize(head, req));
+	for (int i = 0; i < 1; ++i) {
+		LOG_DEBUG() << "request size: " << request.size();
+		TcpRequest1::Create(io, dest, request, DefaultSender(BHomeShm()));
 	}
 
 	io.run();

--
Gitblit v1.8.0