From 9243710ca372de26823c2225c7b46b072458c671 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 28 五月 2021 17:18:33 +0800
Subject: [PATCH] tcp proxy requests, need more test.

---
 box/tcp_connection.cpp    |   91 ++++------
 box/center_topic_node.cpp |    2 
 box/center.cpp            |   50 ++++
 box/io_service.h          |   10 
 box/tcp_proxy.cpp         |   56 +-----
 box/center.h              |    5 
 box/node_center.cpp       |   57 ++++-
 src/topic_node.cpp        |   54 +++--
 src/msg.cpp               |    3 
 box/io_service.cpp        |   33 --
 src/shm_socket.cpp        |    3 
 utest/api_test.cpp        |   17 +
 box/tcp_common.h          |    8 
 box/tcp_proxy.h           |   24 +-
 box/tcp_server.h          |   10 
 utest/tcp_test.cpp        |   61 ++++--
 box/node_center.h         |   13 +
 box/tcp_server.cpp        |   23 -
 box/tcp_connection.h      |   16 +
 19 files changed, 294 insertions(+), 242 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index 8d24315..0fdfa33 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -17,7 +17,9 @@
  */
 #include "center.h"
 #include "center_topic_node.h"
+#include "io_service.h"
 #include "node_center.h"
+#include "tcp_proxy.h"
 #include "tcp_server.h"
 #include <chrono>
 
@@ -74,7 +76,7 @@
 	};
 }
 
-bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm)
+bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy)
 {
 	// command
 	auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool {
@@ -92,9 +94,41 @@
 		center->OnTimer();
 	};
 
-	auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
+	auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
 		auto &center = *center_ptr;
 		auto replyer = MakeReplyer(socket, head, center);
+
+		if (!head.dest().ip().empty()) { // other host, proxy
+			auto valid = [&]() { return head.route_size() == 1; };
+			if (!valid()) { return false; }
+
+			if (head.type() == kMsgTypeRequestTopic) {
+				typedef MsgRequestTopicReply Reply;
+				Reply reply;
+				if (!center->CheckMsg(head, reply)) {
+					replyer(reply);
+				} else {
+					auto onResult = [&center](BHMsgHead &head, std::string body_content) {
+						if (head.route_size() > 0) {
+							auto &back = head.route(head.route_size() - 1);
+							MQInfo dest = {back.mq_id(), back.abs_addr()};
+							head.mutable_route()->RemoveLast();
+							center->PassRemoteReplyToLocal(dest, head, std::move(body_content));
+						}
+					};
+					if (!tcp_proxy.Request(head.dest().ip(), head.dest().port(), msg.content(), onResult)) {
+						replyer(MakeReply<Reply>(eError, "send request failed."));
+					} else {
+						// success
+					}
+				}
+				return true;
+			} else {
+				// ignore other msgs for now.
+			}
+			return false;
+		}
+
 		switch (head.type()) {
 			CASE_ON_MSG_TYPE(ProcInit);
 			CASE_ON_MSG_TYPE(Register);
@@ -168,7 +202,10 @@
 {
 	auto nsec = NodeTimeoutSec();
 	auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
-	AddCenter(center_ptr, shm);
+	io_service_.reset(new IoService);
+	tcp_proxy_.reset(new TcpProxy(io_service_->io()));
+
+	AddCenter(center_ptr, shm, *tcp_proxy_);
 
 	for (auto &kv : Centers()) {
 		auto &info = kv.second;
@@ -176,7 +213,7 @@
 	}
 
 	topic_node_.reset(new CenterTopicNode(center_ptr, shm));
-	tcp_server_.reset(new TcpServer(kBHCenterPort, center_ptr));
+	tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr));
 }
 
 BHCenter::~BHCenter() { Stop(); }
@@ -188,13 +225,14 @@
 		sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_);
 	}
 	topic_node_->Start();
-	tcp_server_->Start();
 	return true;
 }
 
 bool BHCenter::Stop()
 {
-	tcp_server_->Stop();
+	tcp_proxy_.reset();
+	tcp_server_.reset();
+	io_service_.reset();
 	topic_node_->Stop();
 	for (auto &kv : sockets_) {
 		kv.second->Stop();
diff --git a/box/center.h b/box/center.h
index c4aa1ac..8850db4 100644
--- a/box/center.h
+++ b/box/center.h
@@ -24,6 +24,8 @@
 #include <memory>
 class CenterTopicNode;
 class TcpServer;
+class TcpProxy;
+class IoService;
 
 class BHCenter
 {
@@ -54,7 +56,10 @@
 
 	std::map<std::string, std::shared_ptr<ShmSocket>> sockets_;
 	std::unique_ptr<CenterTopicNode> topic_node_;
+
+	std::unique_ptr<IoService> io_service_;
 	std::unique_ptr<TcpServer> tcp_server_;
+	std::unique_ptr<TcpProxy> tcp_proxy_;
 };
 
 #endif // end of include guard: CENTER_TM9OUQTG
diff --git a/box/center_topic_node.cpp b/box/center_topic_node.cpp
index 5c8df7a..749f4e6 100644
--- a/box/center_topic_node.cpp
+++ b/box/center_topic_node.cpp
@@ -106,7 +106,7 @@
 			*reply.mutable_errmsg() = data.errmsg();
 			reply.set_data(ToJson(data));
 		} else {
-			SetError(*reply.mutable_errmsg(), eInvalidInput, "not supported topic" + request.topic());
+			SetError(*reply.mutable_errmsg(), eInvalidInput, "invalid topic: " + request.topic());
 		}
 		pnode_->ServerSendReply(src_info, reply);
 	};
diff --git a/box/io_service.cpp b/box/io_service.cpp
index 1d531e0..5640d50 100644
--- a/box/io_service.cpp
+++ b/box/io_service.cpp
@@ -16,33 +16,16 @@
  * =====================================================================================
  */
 #include "io_service.h"
-#include <chrono>
-using namespace std::chrono_literals;
 
-bool IoService::Start()
+IoService::IoService() :
+    guard_(io_.get_executor())
 {
-	Stop();
-	bool cur = false;
-	if (!run_.compare_exchange_strong(cur, true)) {
-		return false;
-	}
-
-	auto proc = [this]() {
-		while (run_) {
-			io_.run_one_for(100ms);
-		}
-		OnStop();
-	};
-	std::thread(proc).swap(worker_);
-	return true;
+	std::thread([this]() { io_.run(); }).swap(worker_);
 }
-
-void IoService::Stop()
+IoService::~IoService()
 {
-	bool cur = true;
-	if (run_.compare_exchange_strong(cur, false)) {
-		if (worker_.joinable()) {
-			worker_.join();
-		}
-	}
+	guard_.reset();
+	io_.stop(); // normally not needed, but make sure run() exits.
+	if (worker_.joinable())
+		worker_.join();
 }
diff --git a/box/io_service.h b/box/io_service.h
index 000facc..f492e71 100644
--- a/box/io_service.h
+++ b/box/io_service.h
@@ -24,19 +24,17 @@
 class IoService
 {
 public:
-	IoService() :
-	    run_(false) {}
-	bool Start();
-	void Stop();
+	IoService();
+	~IoService();
 
 	typedef boost::asio::io_context io_service_t;
 	io_service_t &io() { return io_; }
 
 private:
-	virtual void OnStop() {}
 	io_service_t io_;
+	typedef boost::asio::executor_work_guard<io_service_t::executor_type> guard_t;
+	guard_t guard_;
 	std::thread worker_;
-	std::atomic<bool> run_;
 };
 
 #endif // end of include guard: IO_SERVICE_ODKKJG3D
diff --git a/box/node_center.cpp b/box/node_center.cpp
index ff199b2..662b2c0 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -70,8 +70,9 @@
 		return;
 	}
 	// LOG_FUNCTION;
+	const size_t total = msgs_.size();
 	time_to_clean_ = now + 1;
-	int64_t limit = std::max(10000ul, msgs_.size() / 10);
+	int64_t limit = std::max(10000ul, total / 10);
 	int64_t n = 0;
 	auto it = msgs_.begin();
 	while (it != msgs_.end() && --limit > 0) {
@@ -82,16 +83,16 @@
 			++n;
 		};
 		int n = now - msg.timestamp();
-		if (n < 10) {
+		if (msg.Count() == 0) {
+			Free();
+		} else if (n > NodeTimeoutSec()) {
+			Free();
+		} else {
 			++it;
-		} else if (msg.Count() == 0) {
-			Free();
-		} else if (n > 60) {
-			Free();
 		}
 	}
 	if (n > 0) {
-		LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n;
+		LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total;
 	}
 }
 
@@ -209,17 +210,25 @@
 	RecordMsg(msg);
 	return socket.Send(dest, msg);
 }
-bool NodeCenter::ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
+
+NodeCenter::Node NodeCenter::GetNode(const MQId mq_id)
 {
-	auto ssn = dest.id_ - (dest.id_ % 10);
-	LOG_DEBUG() << "prox ssn " << ssn;
+	Node node;
+	auto ssn = mq_id - (mq_id % 10);
 	auto pos = nodes_.find(ssn);
-	if (pos == nodes_.end()) {
-		LOG_ERROR() << "proxy msg, ssn not found.";
+	if (pos != nodes_.end()) {
+		node = pos->second;
+	}
+	return node;
+}
+
+bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb)
+{
+	Node node(GetNode(dest.id_));
+	if (!node || !Valid(*node)) {
+		LOG_ERROR() << id() << " pass remote request, dest not found.";
 		return false;
 	}
-	auto &node = pos->second;
-	if (!Valid(*node)) { return false; }
 
 	ShmSocket &sender(DefaultSender(node->shm_));
 	auto route = head.add_route();
@@ -233,6 +242,26 @@
 	return sender.Send(dest, msg, head.msg_id(), std::move(cb));
 }
 
+bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content)
+{
+	Node node(GetNode(dest.id_));
+	if (!node) {
+		LOG_ERROR() << id() << " pass remote reply , ssn not found.";
+		return false;
+	}
+	auto offset = node->addrs_[dest.id_];
+	if (offset != dest.offset_) {
+		LOG_ERROR() << id() << " pass remote reply, dest address not match";
+		return false;
+	}
+
+	ShmMsg msg(node->shm_);
+	if (!msg.Make(head, body_content)) { return false; }
+	DEFER1(msg.Release(););
+	RecordMsg(msg);
+	return DefaultSender(node->shm_).Send(dest, msg);
+}
+
 void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val)
 {
 	// LOG_FUNCTION;
diff --git a/box/node_center.h b/box/node_center.h
index 1c79809..8bc2cf8 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -122,7 +122,8 @@
 	void RecordMsg(const MsgI &msg);
 	bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg);
 	bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg);
-	bool ProxyMsg(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
+	bool PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
+	bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content);
 	void OnAlloc(ShmSocket &socket, const int64_t val);
 	void OnFree(ShmSocket &socket, const int64_t val);
 	bool OnCommand(ShmSocket &socket, const int64_t val);
@@ -159,6 +160,14 @@
 	{
 		return HandleMsg<MsgCommonReply, Func>(head, op);
 	}
+	template <class Reply>
+	bool CheckMsg(const BHMsgHead &head, Reply &reply)
+	{
+		bool r = false;
+		auto onOk = [&](Node) { r = true; return MakeReply<Reply>(eSuccess); };
+		reply = HandleMsg<Reply>(head, onOk);
+		return r;
+	}
 
 	MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg);
 	MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg);
@@ -184,6 +193,8 @@
 		return node && Valid(*node);
 	}
 	void RemoveNode(Node &node);
+	Node GetNode(const MQId mq);
+
 	std::string id_; // center proc id;
 
 	std::unordered_map<Topic, Clients> service_map_;
diff --git a/box/tcp_common.h b/box/tcp_common.h
index 8c8b7ec..3d2b133 100644
--- a/box/tcp_common.h
+++ b/box/tcp_common.h
@@ -21,10 +21,18 @@
 #include <boost/asio.hpp>
 #include <boost/uuid/string_generator.hpp>
 #include <boost/uuid/uuid.hpp>
+#include <functional>
+#include <string>
+
 namespace ip = boost::asio::ip;
 using boost::asio::ip::tcp;
 typedef boost::system::error_code bserror_t;
 
 const boost::uuids::uuid kBHTcpServerTag = boost::uuids::string_generator()("e5bff527-6bf8-ee0d-cd28-b36594acee39");
+namespace bhome_msg
+{
+class BHMsgHead;
+}
+typedef std::function<void(bhome_msg::BHMsgHead &head, std::string body_content)> ReplyCB;
 
 #endif // end of include guard: TCP_COMMON_8S8O7OV
diff --git a/box/tcp_connection.cpp b/box/tcp_connection.cpp
index 22162e5..02001bb 100644
--- a/box/tcp_connection.cpp
+++ b/box/tcp_connection.cpp
@@ -19,6 +19,7 @@
 #include "log.h"
 #include "msg.h"
 #include "node_center.h"
+#include "proto.h"
 #include "shm_socket.h"
 
 namespace
@@ -59,10 +60,8 @@
 bool CheckData(std::vector<char> &buffer, const uint32_t len, BHMsgHead &head, std::string &body_content)
 {
 	const char *p = buffer.data();
-	LOG_DEBUG() << "msg len " << len;
 	if (4 > len) { return false; }
 	uint32_t head_len = Get32(p);
-	LOG_DEBUG() << "head_len " << head_len;
 	if (head_len > 1024 * 4) {
 		throw std::runtime_error("unexpected tcp reply data.");
 	}
@@ -87,15 +86,34 @@
 
 /// request -----------------------------------------------------------
 
+void TcpRequest1::SendReply(BHMsgHead &head, std::string body_content)
+{
+	if (reply_cb_) {
+		reply_cb_(head, std::move(body_content));
+	}
+}
+
 void TcpRequest1::OnError(bserror_t ec)
 {
-	LOG_ERROR() << "tcp client error: " << ec;
+	// LOG_ERROR() << "tcp client error: " << ec << ", " << ec.message();
+	BHMsgHead head;
+	std::string body_content;
+	try {
+		std::vector<char> req(request_.begin(), request_.end());
+		if (CheckData(req, req.size(), head, body_content)) {
+			if (head.type() == kMsgTypeRequestTopic) {
+				SendReply(head, MakeReply<MsgRequestTopicReply>(eError, std::to_string(ec.value()) + ',' + ec.message()).SerializeAsString());
+			}
+		}
+	} catch (std::exception &e) {
+	}
 	Close();
 }
 
 void TcpRequest1::Start()
 {
 	auto readReply = [this]() {
+		// if (!reply_cb_) { return; } // no reply needed, maybe safe to close?
 		recv_buffer_.resize(1000);
 		recv_len_ = 0;
 		socket_.async_read_some(Buffer(recv_buffer_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
@@ -104,47 +122,22 @@
 
 	socket_.async_connect(remote_, TcpCB(*this, request));
 }
-void TcpRequest1::Close()
-{
-	LOG_DEBUG() << "client close";
-	socket_.close();
-}
+void TcpRequest1::Close() { socket_.close(); }
 void TcpRequest1::OnRead(size_t size)
 {
-	LOG_DEBUG() << "reply data: " << recv_buffer_.data() + recv_len_;
-
 	recv_len_ += size;
 	BHMsgHead head;
 	std::string body_content;
-	bool recv_done = false;
 	try {
-		recv_done = CheckData(recv_buffer_, recv_len_, head, body_content);
+		if (CheckData(recv_buffer_, recv_len_, head, body_content)) { // got reply.
+			Close();
+			SendReply(head, std::move(body_content));
+		} else { // not complete, read again
+			socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
+		}
 	} catch (std::exception &e) {
 		LOG_ERROR() << e.what();
 		Close();
-		return;
-	}
-
-	if (recv_done) {
-		// just pass to client, no check, client will check it anyway.
-		LOG_DEBUG() << "route size: " << head.route_size();
-		if (head.route_size() < 1) { return; }
-		auto &back = head.route(head.route_size() - 1);
-		MQInfo dest = {back.mq_id(), back.abs_addr()};
-		head.mutable_route()->RemoveLast();
-
-		LOG_DEBUG() << "tcp got reply, pass to shm: " << dest.id_ << ", " << dest.offset_;
-		MsgRequestTopicReply reply;
-		if (reply.ParseFromString(body_content)) {
-			LOG_DEBUG() << "err msg: " << reply.errmsg().errstring();
-			LOG_DEBUG() << "content : " << reply.data();
-		}
-		Close();
-		return;
-		shm_socket_.Send(dest, std::string(recv_buffer_.data(), recv_buffer_.size()));
-	} else { // read again
-		LOG_DEBUG() << "not complete, read again " << recv_buffer_.size();
-		socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
 	}
 }
 
@@ -153,7 +146,7 @@
 void TcpReply1::OnError(bserror_t ec) { Close(); }
 void TcpReply1::Close()
 {
-	LOG_DEBUG() << "server close.";
+	LOG_TRACE() << "server close.";
 	socket_.close();
 }
 
@@ -177,38 +170,26 @@
 		return;
 	}
 
-	auto ParseBody = [&](auto &req) {
-		const char *p = recv_buffer_.data();
-		uint32_t size = Get32(p);
-		p += 4;
-		p += size;
-		size = Get32(p);
-		p += 4;
-		return req.ParseFromArray(p, size);
-	};
-
 	if (recv_done) {
-		LOG_DEBUG() << "request data: " << size;
-		auto self(shared_from_this());
+		LOG_TRACE() << "tcp server recv request data, size: " << size;
 		MQInfo remote = {head.dest().mq_id(), head.dest().abs_addr()};
 		if (remote.id_ && remote.offset_) {
+			auto self(shared_from_this());
 			auto onRecv = [this, self](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
 				send_buffer_ = imsg.content();
 				async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
 			};
 			auto &scenter = *pscenter_;
-			if (!scenter->ProxyMsg(remote, head, body_content, onRecv)) {
-				send_buffer_ = "fake reply";
-				async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
+			if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) {
+				return;
 			}
 		} else {
 			LOG_DEBUG() << "no address";
-			send_buffer_ = "no address";
-			async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
 		}
+		Close();
 
-	} else { // read again
-		LOG_DEBUG() << "not complete, read again " << recv_buffer_.size();
+	} else { // not complete, read again
+		LOG_TRACE() << "not complete, read again " << recv_buffer_.size();
 		socket_.async_read_some(Buffer(recv_buffer_, recv_len_), TcpCBSize(*this, [this](size_t size) { OnRead(size); }));
 	}
 };
\ No newline at end of file
diff --git a/box/tcp_connection.h b/box/tcp_connection.h
index 5aa93a4..6e1d0ad 100644
--- a/box/tcp_connection.h
+++ b/box/tcp_connection.h
@@ -25,25 +25,28 @@
 #include <memory>
 
 class ShmSocket;
+class NodeCenter;
+typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
+
 class TcpRequest1 : public std::enable_shared_from_this<TcpRequest1>
 {
 public:
-	static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ShmSocket &shm_socket)
+	static void Create(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ReplyCB const &cb)
 	{
-		std::make_shared<TcpRequest1>(io, addr, std::move(request), shm_socket)->Start();
+		std::make_shared<TcpRequest1>(io, addr, std::move(request), cb)->Start();
 	}
-
-	TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ShmSocket &shm_socket) :
-	    socket_(io), shm_socket_(shm_socket), remote_(addr), request_(std::move(request)) {}
+	TcpRequest1(boost::asio::io_context &io, tcp::endpoint const &addr, std::string request, ReplyCB const &cb) :
+	    socket_(io), reply_cb_(cb), remote_(addr), request_(std::move(request)) {}
 	void OnError(bserror_t ec);
 
 private:
 	void Start();
 	void Close();
 	void OnRead(size_t size);
+	void SendReply(BHMsgHead &head, std::string body_content);
 
 	tcp::socket socket_;
-	ShmSocket &shm_socket_; // send reply
+	ReplyCB reply_cb_;
 	tcp::endpoint remote_;
 	std::string request_;
 	std::vector<char> recv_buffer_;
@@ -54,7 +57,6 @@
 class TcpReply1 : public std::enable_shared_from_this<TcpReply1>
 {
 public:
-	typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
 	static void Create(tcp::socket sock, CenterPtr pscenter)
 	{
 		std::make_shared<TcpReply1>(std::move(sock), pscenter)->Start();
diff --git a/box/tcp_proxy.cpp b/box/tcp_proxy.cpp
index 298c6b6..b4ec497 100644
--- a/box/tcp_proxy.cpp
+++ b/box/tcp_proxy.cpp
@@ -16,54 +16,18 @@
  * =====================================================================================
  */
 #include "tcp_proxy.h"
-#include "defs.h"
-#include "shm_socket.h"
 #include "tcp_connection.h"
 
-TcpProxy::TcpProxy() :
-    run_(false) {}
-
-TcpProxy::~TcpProxy() {}
-
-bool TcpProxy::Start(bhome_shm::SharedMemory &shm)
-{
-	Stop();
-	bool cur = false;
-	if (!run_.compare_exchange_strong(cur, true)) { return false; }
-
-	auto &mq = GetCenterInfo(shm)->mq_tcp_proxy_;
-	local_.reset(new ShmSocket(mq.offset_, shm, mq.id_));
-	auto localProc = [this](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
-		auto &dest = head.dest();
-		if (dest.ip().empty() || dest.port() == 0) { return; }
-		Request(dest.ip(), dest.port(), msg.content());
-	};
-	local_->Start(1, localProc);
-
-	auto proxyProc = [this]() {
-		while (run_) {
-			io_context_.run_one_for(std::chrono::milliseconds(100));
-		}
-	};
-	std::thread(proxyProc).swap(worker_);
-	return true;
-}
-
-void TcpProxy::Stop()
-{
-	bool cur = true;
-	if (run_.compare_exchange_strong(cur, false)) {
-		if (worker_.joinable()) {
-			worker_.join();
-		}
-		local_.reset();
-	}
-}
-
-bool TcpProxy::Request(const std::string &ip, int port, std::string &&content)
+bool TcpProxy::Request(const std::string &ip, int port, std::string &&content, ReplyCB const &cb)
 {
 	if (content.empty()) { return false; }
-
-	tcp::endpoint dest(ip::address::from_string(ip), port);
-	TcpRequest1::Create(io_context_, dest, std::move(content), *local_);
+	try {
+		tcp::endpoint dest(ip::address::from_string(ip), port);
+		TcpRequest1::Create(io_, dest, std::move(content), cb);
+		LOG_TRACE() << "tcp request start " << ip << ':' << port;
+		return true;
+	} catch (std::exception &e) {
+		LOG_ERROR() << "proxy request exception: " << e.what();
+		return false;
+	}
 }
diff --git a/box/tcp_proxy.h b/box/tcp_proxy.h
index 9c74532..69c3f03 100644
--- a/box/tcp_proxy.h
+++ b/box/tcp_proxy.h
@@ -18,28 +18,24 @@
 #ifndef TCP_PROXY_E1YJ92U5
 #define TCP_PROXY_E1YJ92U5
 
-#include "shm.h"
+#include "bh_util.h"
+#include "io_service.h"
 #include "tcp_common.h"
-#include <atomic>
-#include <thread>
+#include <memory>
 
-class ShmSocket;
+class NodeCenter;
+typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
 
 class TcpProxy
 {
 public:
-	TcpProxy();
-	~TcpProxy();
-	bool Start(bhome_shm::SharedMemory &shm);
-	void Stop();
+	typedef IoService::io_service_t io_service_t;
+	TcpProxy(io_service_t &io) :
+	    io_(io) {}
+	bool Request(const std::string &ip, int port, std::string &&content, ReplyCB const &cb);
 
 private:
-	bool Request(const std::string &ip, int port, std::string &&content);
-	std::unique_ptr<ShmSocket> local_;
-
-	boost::asio::io_context io_context_;
-	std::thread worker_;
-	std::atomic<bool> run_;
+	io_service_t &io_;
 };
 
 #endif // end of include guard: TCP_PROXY_E1YJ92U5
diff --git a/box/tcp_server.cpp b/box/tcp_server.cpp
index ea23106..5cd8743 100644
--- a/box/tcp_server.cpp
+++ b/box/tcp_server.cpp
@@ -23,26 +23,19 @@
 
 using namespace std::chrono_literals;
 
-TcpServer::TcpServer(int port, CenterPtr pscenter) :
-    listener_(io(), tcp::endpoint(tcp::v6(), port)), pscenter_(pscenter)
-{
-	Accept();
-}
-
-TcpServer::~TcpServer() { Stop(); }
-
-void TcpServer::OnStop()
-{
-	listener_.close();
-}
-
 void TcpServer::Accept()
 {
 	listener_.async_accept([this](bserror_t ec, tcp::socket sock) {
 		if (!ec) {
-			LOG_INFO() << "server accept client";
+			LOG_TRACE() << "server accept client";
 			TcpReply1::Create(std::move(sock), pscenter_);
+			Accept();
+		} else {
+			// this is already destructed by now.
+			if (ec.value() != ECANCELED) {
+				LOG_WARNING() << "tcp server accept error: " << ec;
+				Accept();
+			}
 		}
-		Accept();
 	});
 }
\ No newline at end of file
diff --git a/box/tcp_server.h b/box/tcp_server.h
index 2c9337c..4698196 100644
--- a/box/tcp_server.h
+++ b/box/tcp_server.h
@@ -23,15 +23,17 @@
 #include "tcp_common.h"
 
 class NodeCenter;
-class TcpServer : public IoService
+class TcpServer
 {
 public:
+	typedef IoService::io_service_t io_service_t;
 	typedef std::shared_ptr<Synced<NodeCenter>> CenterPtr;
-	TcpServer(int port, CenterPtr pscenter);
-	~TcpServer();
+	TcpServer(io_service_t &io, int port, CenterPtr pscenter) :
+	    io_(io), listener_(io_, tcp::endpoint(tcp::v6(), port)), pscenter_(pscenter) { Accept(); }
+	~TcpServer() { listener_.close(); }
 
 private:
-	virtual void OnStop();
+	io_service_t &io_;
 	void Accept();
 	tcp::acceptor listener_;
 	CenterPtr pscenter_;
diff --git a/src/msg.cpp b/src/msg.cpp
index 40a7b0d..3546424 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -37,7 +37,8 @@
 			Free();
 		}
 	} else if (n < 0) {
-		LOG_FATAL() << "error double release data.";
+		// ns_log::GetTrace();
+		LOG_FATAL() << "double release msg.";
 		throw std::runtime_error("double release msg.");
 	}
 	return n;
diff --git a/src/shm_socket.cpp b/src/shm_socket.cpp
index d54168d..2bdccc2 100644
--- a/src/shm_socket.cpp
+++ b/src/shm_socket.cpp
@@ -171,8 +171,7 @@
 			RecvCB cb_no_use;
 			per_msg_cbs_->Pick(msg_id, cb_no_use);
 		};
-		SendImpl(remote, msg, onExpireRemoveCB);
-		return true;
+		return SendImpl(remote, msg, onExpireRemoveCB);
 	} catch (std::exception &e) {
 		SetLastError(eError, "Send internal error.");
 		return false;
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 3c38121..b21f7ef 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -497,9 +497,10 @@
 
 	out_msg_id = msg_id;
 
-	auto SendTo = [this, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) {
+	auto SendTo = [this, remote_addr, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) {
 		auto &sock = SockClient();
 		BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id));
+		*head.mutable_dest() = remote_addr;
 		AddRoute(head, sock);
 		head.set_topic(req.topic());
 
@@ -519,8 +520,12 @@
 	};
 
 	try {
-		BHAddress addr;
-		return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb);
+		if (remote_addr.ip().empty()) {
+			BHAddress addr;
+			return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb);
+		} else {
+			return SendTo(CenterAddr(), req, cb);
+		}
 	} catch (...) {
 		SetLastError(eError, "internal error.");
 		return false;
@@ -536,25 +541,34 @@
 
 	try {
 		auto &sock = SockClient();
-
-		BHAddress addr;
-		if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
-			LOG_TRACE() << "node: " << SockNode().id() << ", topic dest: " << addr.mq_id();
-			BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
-			AddRoute(head, sock);
-			head.set_topic(request.topic());
-
-			MsgI reply_msg(shm());
-			DEFER1(reply_msg.Release(););
-			BHMsgHead reply_head;
-
-			if (sock.SendAndRecv({addr.mq_id(), addr.abs_addr()}, head, request, reply_msg, reply_head, timeout_ms) &&
-			    reply_head.type() == kMsgTypeRequestTopicReply &&
-			    reply_msg.ParseBody(out_reply)) {
-				reply_head.mutable_proc_id()->swap(out_proc_id);
-				return true;
+		MQInfo dest;
+		if (!remote_addr.ip().empty()) {
+			dest = CenterAddr();
+		} else {
+			BHAddress addr;
+			if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
+				dest.offset_ = addr.abs_addr();
+				dest.id_ = addr.mq_id();
+			} else {
+				return false;
 			}
 		}
+
+		BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
+		*head.mutable_dest() = remote_addr;
+		AddRoute(head, sock);
+		head.set_topic(request.topic());
+
+		MsgI reply_msg(shm());
+		DEFER1(reply_msg.Release(););
+		BHMsgHead reply_head;
+
+		if (sock.SendAndRecv(dest, head, request, reply_msg, reply_head, timeout_ms) &&
+		    reply_head.type() == kMsgTypeRequestTopicReply &&
+		    reply_msg.ParseBody(out_reply)) {
+			reply_head.mutable_proc_id()->swap(out_proc_id);
+			return true;
+		}
 	} catch (...) {
 		SetLastError(eError, __func__ + std::string(" internal errer."));
 	}
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 3d842bf..bddcbf7 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -206,7 +206,7 @@
 		}
 		printf("\n");
 	};
-	{
+	if (0) {
 		// query procs
 		std::string dest(BHAddress().SerializeAsString());
 		MsgQueryProc query;
@@ -224,14 +224,21 @@
 		// printf("register topic : %s\n", r ? "ok" : "failed");
 		// Sleep(1s);
 	}
-	{
+	for (int i = 0; i < 3; ++i) {
 		// query procs with normal topic request
 		MsgRequestTopic req;
 		req.set_topic("#center_query_procs");
 		// req.set_data("{\"proc_id\":\"#center.node\"}");
 		std::string s(req.SerializeAsString());
 		// Sleep(10ms, false);
-		std::string dest(BHAddress().SerializeAsString());
+		BHAddress host;
+		printf("query with ip set\n");
+		host.set_ip("127.0.0.1");
+		host.set_port(kBHCenterPort);
+		host.set_mq_id(1000011);
+		host.set_abs_addr(10296);
+
+		std::string dest(host.SerializeAsString());
 		void *proc_id = 0;
 		int proc_id_len = 0;
 		DEFER1(BHFree(proc_id, proc_id_len););
@@ -247,7 +254,7 @@
 		} else {
 			MsgRequestTopicReply ret;
 			ret.ParseFromArray(reply, reply_len);
-			printf("topic query proc : %s\n", ret.data().c_str());
+			printf("\ntopic query proc : %s\n", ret.data().c_str());
 			// MsgQueryProcReply result;
 			// if (result.ParseFromArray(ret.data().data(), ret.data().size()) && IsSuccess(result.errmsg().errcode())) {
 			// 	PrintProcs(result);
@@ -325,7 +332,7 @@
 		for (int i = 0; i < 1; ++i) {
 			MsgPublish pub;
 			pub.set_topic(topic_ + std::to_string(i));
-			pub.set_data("pub_data_" + std::string(1024 * 1, 'a'));
+			pub.set_data("pub_data_" + std::string(104 * 1, 'a'));
 			std::string s(pub.SerializeAsString());
 			BHPublish(s.data(), s.size(), 0);
 			// Sleep(1s);
diff --git a/utest/tcp_test.cpp b/utest/tcp_test.cpp
index a838252..86a0897 100644
--- a/utest/tcp_test.cpp
+++ b/utest/tcp_test.cpp
@@ -19,6 +19,7 @@
 #include "defs.h"
 #include "node_center.h"
 #include "tcp_connection.h"
+#include "tcp_proxy.h"
 #include "tcp_server.h"
 #include "util.h"
 #include <sys/ioctl.h>
@@ -33,34 +34,54 @@
 
 BOOST_AUTO_TEST_CASE(TcpTest)
 {
-	SharedMemory &shm = TestShm();
-
 	const std::string connect_addr = "127.0.0.1";
 	const uint16_t port = kBHCenterPort;
 
-	boost::asio::io_context io;
+	IoService io;
 
 	tcp::endpoint dest(ip::address::from_string(connect_addr), port);
-	MsgRequestTopic req;
-	req.set_topic("#center_query_procs");
-	req.set_data("");
-	auto head = InitMsgHead(GetType(req), "#test_proc", 1000000);
-	auto route = head.add_route();
-	route->set_mq_id(12345);
-	route->set_abs_addr(67890);
 
-	head.mutable_dest()->set_ip(connect_addr);
-	head.mutable_dest()->set_port(port);
-	head.mutable_dest()->set_mq_id(1000011);
-	head.mutable_dest()->set_abs_addr(10296);
+	auto NewRequest = [&]() {
+		MsgRequestTopic req;
+		req.set_topic("#center_query_procs");
+		req.set_data("");
+		auto head = InitMsgHead(GetType(req), "#test_proc", 1000000);
+		auto route = head.add_route();
+		route->set_mq_id(12345);
+		route->set_abs_addr(67890);
 
-	auto request(MsgI::Serialize(head, req));
-	for (int i = 0; i < 1; ++i) {
-		LOG_DEBUG() << "request size: " << request.size();
-		TcpRequest1::Create(io, dest, request, DefaultSender(BHomeShm()));
+		head.mutable_dest()->set_ip(connect_addr);
+		head.mutable_dest()->set_port(port);
+		head.mutable_dest()->set_mq_id(1000011);
+		head.mutable_dest()->set_abs_addr(10296);
+
+		return (MsgI::Serialize(head, req));
+	};
+	auto onReply = [](BHMsgHead &head, std::string body_content) {
+		static int n = 0;
+		printf("reply %d: ", ++n);
+		MsgRequestTopicReply reply;
+		if (reply.ParseFromString(body_content)) {
+			if (IsSuccess(reply.errmsg().errcode())) {
+				printf("\ncontent: %s\n", reply.data().c_str());
+			} else {
+				printf("error: %s\n", reply.errmsg().errstring().c_str());
+			}
+		} else {
+			printf("parse error\n");
+		}
+	};
+	for (int i = 0; i < 100; ++i) {
+		auto request = NewRequest();
+		TcpRequest1::Create(io.io(), dest, request, onReply);
 	}
-
-	io.run();
+	Sleep(2s);
+	printf("-------------------------------------------------------\n");
+	for (int i = 0; i < 3; ++i) {
+		auto request = NewRequest();
+		TcpRequest1::Create(io.io(), dest, request, onReply);
+	}
+	Sleep(2s);
 
 	printf("TcpTest\n");
 }

--
Gitblit v1.8.0