From c64c54d8e75b9354dc49a7b6b2d326e7dd59eb37 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 15 四月 2021 19:32:16 +0800
Subject: [PATCH] add api; fix send, socknode mem leak.

---
 src/proto.cpp         |    2 
 src/socket.h          |   11 
 box/center.cpp        |   53 ++-
 src/proto.h           |    2 
 .vscode/settings.json |    3 
 utest/utest.cpp       |   28 +-
 src/topic_node.cpp    |  129 +++++++---
 box/center_main.cc    |   25 ++
 utest/api_test.cpp    |  196 +++++++++++++++-
 utest/util.h          |    7 
 .vscode/launch.json   |    2 
 src/topic_node.h      |   14 
 src/bh_api.h          |   26 ++
 src/sendq.h           |    2 
 src/bh_api.cpp        |  142 ++++++++---
 box/app_arg.h         |   59 ++++
 16 files changed, 558 insertions(+), 143 deletions(-)

diff --git a/.vscode/launch.json b/.vscode/launch.json
index 12aa21d..939b9a9 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -11,7 +11,7 @@
             "program": "${workspaceFolder}/debug/bin/utest",
             "args": [
                 "-t",
-                "SRTest"
+                "ApiTest"
             ],
             "stopAtEntry": false,
             "cwd": "${workspaceFolder}",
diff --git a/.vscode/settings.json b/.vscode/settings.json
index 97450e9..88753a7 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -60,7 +60,8 @@
         "*.inc": "cpp",
         "strstream": "cpp",
         "unordered_set": "cpp",
-        "cfenv": "cpp"
+        "cfenv": "cpp",
+        "*.ipp": "cpp"
     },
     "files.exclude": {
         "**/*.un~": true,
diff --git a/box/app_arg.h b/box/app_arg.h
new file mode 100644
index 0000000..e9e2c1c
--- /dev/null
+++ b/box/app_arg.h
@@ -0,0 +1,59 @@
+#ifndef APP_ARG_OQMELZBX
+#define APP_ARG_OQMELZBX
+
+#include <map>
+#include <string>
+
+class AppArg
+{
+	typedef std::map<std::string, std::string> ArgMap;
+public:
+	AppArg(int argc, const char *argv[]) {
+		Parse(argc, argv);
+	}
+	bool Has(const std::string &key) const {
+		return Pos(key) != args.end();
+	}
+	std::string Get(const std::string &key, const std::string &def = "") const {
+		ArgMap::const_iterator pos = Pos(key);
+		if (pos != args.end()) {
+			return pos->second;
+		} else {
+			return def;
+		}
+	}
+private:
+	void Parse(int argc, const char *argv[]) {
+		for (int i = 1; i < argc; ++i) {
+			std::string text(argv[i]);
+			if (text.substr(0, 2) == "--") {
+				text = text.substr(2);
+				std::string::size_type sep = text.find('=');
+				if (sep == std::string::npos) {
+					args[text].clear();
+				} else {
+					args[text.substr(0, sep)] = text.substr(sep+1);
+				}
+			} else if (text.substr(0,1) == "-") {
+				text = text.substr(1);
+				args[text].clear();
+				if (i+1 < argc) {
+					std::string next(argv[i+1]);
+					if (next.substr(0,1) != "-") {
+						args[text] = next;
+						++i;
+					}
+				}
+			}
+		}
+		
+	}
+	ArgMap::const_iterator Pos(const std::string &key) const {
+		return args.find(key);
+	}
+
+	ArgMap args;
+};
+
+#endif // end of include guard: APP_ARG_OQMELZBX 
+
diff --git a/box/center.cpp b/box/center.cpp
index 0dd4ed4..8625f7f 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -121,20 +121,18 @@
 			};
 
 			auto pos = nodes_.find(head.proc_id());
-			if (pos == nodes_.end()) { // new client
-				Node node(new NodeInfo);
-				UpdateRegInfo(node);
-				nodes_[node->proc_.proc_id()] = node;
-			} else {
+			if (pos != nodes_.end()) { // new client
 				Node &node = pos->second;
 				if (node->addrs_.find(SrcAddr(head)) == node->addrs_.end()) {
 					// node restarted, release old mq.
-					for (auto &addr : node->addrs_) {
-						cleaner_(addr);
-					}
-					node->addrs_.clear();
+					RemoveNode(node);
+					node.reset(new NodeInfo);
 				}
 				UpdateRegInfo(node);
+			} else {
+				Node node(new NodeInfo);
+				UpdateRegInfo(node);
+				nodes_[node->proc_.proc_id()] = node;
 			}
 			return MakeReply(eSuccess);
 		} catch (...) {
@@ -334,11 +332,7 @@
 			auto &cli = *it->second;
 			cli.state_.UpdateState(now, offline_time_, kill_time_);
 			if (cli.state_.flag_ == kStateKillme) {
-				if (cleaner_) {
-					for (auto &addr : cli.addrs_) {
-						cleaner_(addr);
-					}
-				}
+				RemoveNode(it->second);
 				it = nodes_.erase(it);
 			} else {
 				++it;
@@ -357,6 +351,30 @@
 	{
 		auto node = weak.lock();
 		return node && Valid(*node);
+	}
+	void RemoveNode(Node &node)
+	{
+		auto EraseMapRec = [&node](auto &rec_map, auto &node_rec) {
+			for (auto &addr_topics : node_rec) {
+				TopicDest dest{addr_topics.first, node};
+				for (auto &topic : addr_topics.second) {
+					auto pos = rec_map.find(topic);
+					if (pos != rec_map.end()) {
+						pos->second.erase(dest);
+						if (pos->second.empty()) {
+							rec_map.erase(pos);
+						}
+					}
+				}
+			}
+		};
+		EraseMapRec(service_map_, node->services_);
+		EraseMapRec(subscribe_map_, node->subscriptions_);
+
+		for (auto &addr : node->addrs_) {
+			cleaner_(addr);
+		}
+		node->addrs_.clear();
 	}
 	std::string id_; // center proc id;
 
@@ -403,11 +421,8 @@
 	auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
 		return [&](auto &&rep_body) {
 			auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
-			MsgI msg;
-			if (msg.Make(socket.shm(), reply_head, rep_body)) {
-				auto &remote = head.route(0).mq_id();
-				bool r = socket.Send(remote.data(), msg);
-			}
+			auto &remote = head.route(0).mq_id();
+			socket.Send(remote.data(), reply_head, rep_body);
 		};
 	};
 
diff --git a/box/center_main.cc b/box/center_main.cc
index 40aed56..5baa409 100644
--- a/box/center_main.cc
+++ b/box/center_main.cc
@@ -15,17 +15,40 @@
  *
  * =====================================================================================
  */
+#include "app_arg.h"
 #include "box.h"
 #include "center.h"
 #include "defs.h"
 #include "signalhandle.h"
+#include <chrono>
+#include <thread>
+using namespace std::chrono_literals;
 
 int center_main(int argc, const char *argv[])
 {
+	AppArg args(argc, argv);
+	if (args.Has("remove")) {
+		BHomeShm().Remove();
+		return 0;
+	}
+
+	bool run = true;
+	auto showStatus = [&]() {
+		auto init = BHomeShm().get_free_memory();
+		uint64_t idx = 0;
+		while (run) {
+			std::this_thread::sleep_for(1s);
+			printf("%8d shared memory: avail : %ld / %ld\n", ++idx, BHomeShm().get_free_memory(), init);
+		}
+	};
+	std::thread t(showStatus);
+
 	BHCenter center(BHomeShm());
 	center.Start();
+	printf("center started ...\n");
 	WaitForSignals({SIGINT, SIGTERM});
-	// BHomeShm().Remove(); // remove ?
+	run = false;
+	t.join();
 	return 0;
 }
 
diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index 78b8a59..2abe66d 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -39,11 +39,25 @@
 	}
 	size_t size() const { return size_; }
 	operator bool() const { return ptr_; }
+	bool ReleaseTo(void **pdata, int *psize)
+	{
+		if (!ptr_) {
+			return false;
+		}
+		if (pdata && psize) {
+			*psize = size();
+			*pdata = release();
+		}
+		return true;
+	}
 };
 
 template <class Msg>
 bool PackOutput(const Msg &msg, void **out, int *out_len)
 {
+	if (!out || !out_len) {
+		return true; // not wanted.
+	}
 	auto size = msg.ByteSizeLong();
 	TmpPtr p(size);
 	if (!p) {
@@ -51,30 +65,37 @@
 		return false;
 	}
 	msg.SerializePartialToArray(p.get(), size);
-	*out = p.release();
-	*out_len = size;
+	p.ReleaseTo(out, out_len);
 	return true;
+}
+
+template <class MsgIn, class MsgOut = MsgCommonReply>
+bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int),
+                  const void *request,
+                  const int request_len,
+                  void **reply,
+                  int *reply_len,
+                  const int timeout_ms)
+{
+	MsgIn input;
+	if (!input.ParseFromArray(request, request_len)) {
+		SetLastError(eInvalidInput, "invalid input.");
+		return false;
+	}
+	MsgOut msg_reply;
+	if ((ProcNode().*mfunc)(input, msg_reply, timeout_ms)) {
+		return PackOutput(msg_reply, reply, reply_len);
+
+	} else {
+		return false;
+	}
 }
 
 } // namespace
 
-bool BHRegister(const void *proc_info,
-                const int proc_info_len,
-                void **reply,
-                int *reply_len,
-                const int timeout_ms)
+bool BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
 {
-	ProcInfo pi;
-	if (!pi.ParseFromArray(proc_info, proc_info_len)) {
-		SetLastError(eInvalidInput, "invalid input.");
-		return false;
-	}
-	MsgCommonReply msg_reply;
-	if (ProcNode().Register(pi, msg_reply, timeout_ms)) {
-		return PackOutput(msg_reply, reply, reply_len);
-	} else {
-		return false;
-	}
+	return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
 }
 
 bool BHHeartBeatEasy(const int timeout_ms)
@@ -82,23 +103,19 @@
 	return ProcNode().Heartbeat(timeout_ms);
 }
 
-bool BHHeartBeat(const void *proc_info,
-                 const int proc_info_len,
-                 void **reply,
-                 int *reply_len,
-                 const int timeout_ms)
+bool BHHeartBeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
 {
-	ProcInfo pi;
-	if (!pi.ParseFromArray(proc_info, proc_info_len)) {
-		SetLastError(eInvalidInput, "invalid input.");
-		return false;
-	}
-	MsgCommonReply msg_reply;
-	if (ProcNode().Heartbeat(pi, msg_reply, timeout_ms)) {
-		return PackOutput(msg_reply, reply, reply_len);
-	} else {
-		return false;
-	}
+	return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
+}
+
+bool BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+{
+	return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
+}
+
+bool BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+{
+	return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
 }
 
 bool BHPublish(const void *msgpub,
@@ -125,8 +142,35 @@
 	if (ProcNode().RecvSub(proc, pub, timeout_ms)) {
 		TmpPtr pproc(proc);
 		if (pproc && PackOutput(pub, msgpub, msgpub_len)) {
-			*proc_id = pproc.release();
-			*proc_id_len = pproc.size();
+			pproc.ReleaseTo(proc_id, proc_id_len);
+			return true;
+		} else {
+			SetLastError(ENOMEM, "out of mem");
+		}
+	}
+	return false;
+}
+
+bool BHAsyncRequest(const void *request,
+                    const int request_len,
+                    void **msg_id,
+                    int *msg_id_len)
+{
+	MsgRequestTopic req;
+	if (!req.ParseFromArray(request, request_len)) {
+		SetLastError(eInvalidInput, "invalid input.");
+		return false;
+	}
+	std::string str_msg_id;
+	MsgRequestTopicReply out_msg;
+	if (ProcNode().ClientAsyncRequest(req, str_msg_id)) {
+		if (!msg_id || !msg_id_len) {
+			return true;
+		}
+		TmpPtr ptr(str_msg_id);
+		if (ptr) {
+			ptr.ReleaseTo(msg_id, msg_id_len);
+			return true;
 		} else {
 			SetLastError(ENOMEM, "out of mem");
 		}
@@ -152,8 +196,8 @@
 	if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) {
 		TmpPtr pproc(proc);
 		if (pproc && PackOutput(out_msg, reply, reply_len)) {
-			*proc_id = pproc.release();
-			*proc_id_len = pproc.size();
+			pproc.ReleaseTo(proc_id, proc_id_len);
+			return true;
 		} else {
 			SetLastError(ENOMEM, "out of mem");
 		}
@@ -174,9 +218,9 @@
 	if (ProcNode().ServerRecvRequest(src_info, proc, out_msg, timeout_ms)) {
 		TmpPtr pproc(proc);
 		if (pproc && PackOutput(out_msg, request, request_len)) {
-			*proc_id = pproc.release();
-			*proc_id_len = pproc.size();
+			pproc.ReleaseTo(proc_id, proc_id_len);
 			*src = src_info;
+			return true;
 		} else {
 			SetLastError(ENOMEM, "out of mem");
 		}
@@ -206,10 +250,11 @@
 typedef std::function<bool(const void *, const int)> ServerSender;
 } // namespace
 
-void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb)
+void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb)
 {
 	TopicNode::ServerCB on_req;
 	TopicNode::SubDataCB on_sub;
+	TopicNode::RequestResultCB on_reply;
 	if (server_cb) {
 		on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
 			std::string sreq(request.SerializeAsString());
@@ -228,8 +273,16 @@
 			sub_cb(proc_id.data(), proc_id.size(), s.data(), s.size());
 		};
 	}
+	if (client_cb) {
+		on_reply = [client_cb](const BHMsgHead &head, const MsgRequestTopicReply &rep) {
+			std::string s(rep.SerializeAsString());
+			client_cb(head.proc_id().data(), head.proc_id().size(),
+			          head.msg_id().data(), head.msg_id().size(),
+			          s.data(), s.size());
+		};
+	}
 
-	ProcNode().Start(on_req, on_sub);
+	ProcNode().Start(on_req, on_sub, on_reply);
 }
 bool BHServerCallbackReply(const BHServerCallbackTag *tag,
                            const void *data,
@@ -251,10 +304,7 @@
 		std::string err_msg;
 		GetLastError(ec, err_msg);
 		TmpPtr p(err_msg);
-		if (p) {
-			*msg = p.release();
-			*msg_len = p.size();
-		}
+		p.ReleaseTo(msg, msg_len);
 	}
 	return ec;
 }
diff --git a/src/bh_api.h b/src/bh_api.h
index 1023ba4..eeb47a5 100644
--- a/src/bh_api.h
+++ b/src/bh_api.h
@@ -14,6 +14,18 @@
                 int *reply_len,
                 const int timeout_ms);
 
+bool BHRegisterTopics(const void *topics,
+                      const int topics_len,
+                      void **reply,
+                      int *reply_len,
+                      const int timeout_ms);
+
+bool BHSubscribeTopics(const void *topics,
+                       const int topics_len,
+                       void **reply,
+                       int *reply_len,
+                       const int timeout_ms);
+
 typedef void (*FSubDataCallback)(const void *proc_id,
                                  const int proc_id_len,
                                  const void *data,
@@ -25,7 +37,14 @@
                                 const int data_len,
                                 BHServerCallbackTag *tag);
 
-void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb);
+typedef void (*FClientCallback)(const void *proc_id,
+                                const int proc_id_len,
+                                const void *msg_id,
+                                const int msg_id_len,
+                                const void *data,
+                                const int data_len);
+
+void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb);
 bool BHServerCallbackReply(const BHServerCallbackTag *tag,
                            const void *data,
                            const int data_len);
@@ -47,6 +66,11 @@
                int *msgpub_len,
                const int timeout_ms);
 
+bool BHAsyncRequest(const void *request,
+                    const int request_len,
+                    void **msg_id,
+                    int *msg_id_len);
+
 bool BHRequest(const void *request,
                const int request_len,
                void **proc_id,
diff --git a/src/proto.cpp b/src/proto.cpp
index 287924b..b1e8207 100644
--- a/src/proto.cpp
+++ b/src/proto.cpp
@@ -30,6 +30,8 @@
 
 } // namespace
 
+std::string NewMsgId() { return RandId(); }
+
 BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id)
 {
 	return InitMsgHead(type, proc_id, RandId());
diff --git a/src/proto.h b/src/proto.h
index 42fe343..b418342 100644
--- a/src/proto.h
+++ b/src/proto.h
@@ -72,7 +72,7 @@
 	SetError(*msg.mutable_errmsg(), err_code, err_str);
 	return msg;
 }
-
+std::string NewMsgId();
 BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid);
 BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id);
 // inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
diff --git a/src/sendq.h b/src/sendq.h
index b4f3821..aa8923d 100644
--- a/src/sendq.h
+++ b/src/sendq.h
@@ -55,7 +55,7 @@
 	void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
 	{
 		using namespace std::chrono_literals;
-		Append(addr, msg, Now() + 60s, onExpire);
+		Append(addr, msg, Now() + 3s, onExpire);
 	}
 	bool TrySend(bhome_shm::ShmMsgQueue &mq);
 	// bool empty() const { return store_.empty(); }
diff --git a/src/socket.h b/src/socket.h
index 0b0b880..96af6e7 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -36,7 +36,7 @@
 
 class ShmSocket : private boost::noncopyable
 {
-	bool SendImpl(const void *valid_remote, const MsgI &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent())
+	bool SendImpl(const void *valid_remote, MsgI const &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent())
 	{
 		// if (!mq().TrySend(*(MQId *) valid_remote, imsg)) {
 		send_buffer_.Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire);
@@ -69,7 +69,11 @@
 	bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body)
 	{
 		MsgI msg;
-		return msg.Make(shm(), head, body) && SendImpl(valid_remote, msg);
+		if (msg.Make(shm(), head, body)) {
+			DEFER1(if (msg.IsCounted()) { msg.Release(shm()); });
+			return SendImpl(valid_remote, msg);
+		}
+		return false;
 	}
 
 	template <class Body>
@@ -78,6 +82,7 @@
 		//TODO send_buffer_ need flag, and remove callback on expire.
 		MsgI msg;
 		if (msg.Make(shm(), head, body)) {
+			DEFER1(if (msg.IsCounted()) { msg.Release(shm()); });
 			std::string msg_id(head.msg_id());
 			per_msg_cbs_->Add(msg_id, cb);
 			auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) {
@@ -85,6 +90,8 @@
 				per_msg_cbs_->Find(msg_id, cb_no_use);
 			};
 			return SendImpl(valid_remote, msg, onExpireRemoveCB);
+		} else {
+			printf("out of mem?, avail: %ld\n", shm().get_free_memory());
 		}
 		return false;
 	}
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 4ce2c97..e9e627f 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -35,35 +35,45 @@
 } // namespace
 
 TopicNode::TopicNode(SharedMemory &shm) :
-    shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm)
+    shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm), registered_(false)
 {
-	SockNode().Start();
+	// recv msgs to avoid memory leak.
+	auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
+	SockNode().Start(default_ignore_msg);
 }
 
 TopicNode::~TopicNode()
 {
 	Stop();
+	SockNode().Stop();
 }
 
-void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb)
+void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker)
 {
-	ServerStart(server_cb, 1);
-	SubscribeStartWorker(sub_cb, 1);
-	// SockClient().Start();
+	if (nworker < 1) {
+		nworker = 1;
+	} else if (nworker > 16) {
+		nworker = 16;
+	}
+
+	ServerStart(server_cb, nworker);
+	SubscribeStartWorker(sub_cb, nworker);
+	ClientStartWorker(client_cb, nworker);
 }
 void TopicNode::Stop()
 {
 	SockSub().Stop();
 	SockServer().Stop();
 	SockClient().Stop();
-	SockNode().Stop();
 }
 
 bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
 {
+	info_ = proc;
+
 	auto &sock = SockNode();
 	MsgRegister body;
-	*body.mutable_proc() = proc;
+	body.mutable_proc()->Swap(&proc);
 	auto AddId = [&](const MQId &id) { body.add_addrs()->set_mq_id(&id, sizeof(id)); };
 	AddId(SockNode().id());
 	AddId(SockServer().id());
@@ -74,27 +84,39 @@
 	auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
 	AddRoute(head, sock.id());
 
+	auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) {
+		bool ok = head.type() == kMsgTypeCommonReply &&
+		          msg.ParseBody(rbody) &&
+		          IsSuccess(rbody.errmsg().errcode());
+		printf("async regisered %s\n", ok ? "ok" : "failed");
+		registered_.store(ok);
+	};
+
 	if (timeout_ms == 0) {
-		return sock.Send(&BHTopicCenterAddress(), head, body);
+		auto onResult = [this, CheckResult](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
+			MsgCommonReply body;
+			CheckResult(imsg, head, body);
+		};
+		return sock.Send(&BHTopicCenterAddress(), head, body, onResult);
 	} else {
 		MsgI reply;
 		DEFER1(reply.Release(shm_););
 		BHMsgHead reply_head;
 		bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
-		r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
-		if (r && IsSuccess(reply_body.errmsg().errcode())) {
-			info_ = body;
-			return true;
+		if (r) {
+			CheckResult(reply, reply_head, reply_body);
 		}
-		return false;
+		return IsRegistered();
 	}
 }
 
 bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
 {
+	if (!IsRegistered()) { return false; }
+
 	auto &sock = SockNode();
 	MsgHeartbeat body;
-	*body.mutable_proc() = proc;
+	body.mutable_proc()->Swap(&proc);
 
 	auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
 	AddRoute(head, sock.id());
@@ -120,7 +142,8 @@
 
 bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
 {
-	//TODO check registered
+	if (!IsRegistered()) { return false; }
+
 	auto &sock = SockServer();
 	MsgRegisterRPC body;
 	body.mutable_topics()->Swap(&topics);
@@ -155,11 +178,8 @@
 			for (int i = 0; i < head.route_size() - 1; ++i) {
 				reply_head.add_route()->Swap(head.mutable_route(i));
 			}
-			MsgI msg;
-			if (msg.Make(sock.shm(), reply_head, reply_body)) {
-				auto &remote = head.route().rbegin()->mq_id();
-				sock.Send(remote.data(), msg);
-			}
+			auto &remote = head.route().rbegin()->mq_id();
+			sock.Send(remote.data(), reply_head, reply_body);
 		}
 	};
 
@@ -169,6 +189,8 @@
 
 bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
 {
+	if (!IsRegistered()) { return false; }
+
 	auto &sock = SockServer();
 
 	MsgI imsg;
@@ -188,6 +210,8 @@
 
 bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body)
 {
+	if (!IsRegistered()) { return false; }
+
 	auto &sock = SockServer();
 
 	SrcInfo *p = static_cast<SrcInfo *>(src_info);
@@ -211,7 +235,7 @@
 		if (head.type() == kMsgTypeRequestTopicReply) {
 			MsgRequestTopicReply reply;
 			if (imsg.ParseBody(reply)) {
-				cb(head.proc_id(), reply);
+				cb(head, reply);
 			}
 		}
 	};
@@ -219,37 +243,60 @@
 	return SockRequest().Start(onData, nworker);
 }
 
-bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const RequestResultCB &cb)
+bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb)
 {
-	auto Call = [&](const void *remote) {
-		auto &sock = SockRequest();
+	if (!IsRegistered()) { return false; }
 
-		BHMsgHead head(InitMsgHead(GetType(req), proc_id()));
+	const std::string &msg_id(NewMsgId());
+
+	out_msg_id = msg_id;
+
+	auto SendTo = [this, msg_id](const BHAddress &addr, const MsgRequestTopic &req, const RequestResultCB &cb) {
+		auto &sock = SockClient();
+		BHMsgHead head(InitMsgHead(GetType(req), proc_id(), msg_id));
 		AddRoute(head, sock.id());
+		head.set_topic(req.topic());
 
 		if (cb) {
 			auto onRecv = [cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
 				if (head.type() == kMsgTypeRequestTopicReply) {
 					MsgRequestTopicReply reply;
 					if (imsg.ParseBody(reply)) {
-						cb(head.proc_id(), reply);
+						cb(head, reply);
 					}
 				}
 			};
-			return sock.Send(remote, head, req, onRecv);
+			return sock.Send(addr.mq_id().data(), head, req, onRecv);
 		} else {
-			return sock.Send(remote, head, req);
+			return sock.Send(addr.mq_id().data(), head, req);
 		}
 	};
 
 	try {
+		auto &sock = SockClient();
 		BHAddress addr;
-		if (ClientQueryRPCTopic(req.topic(), addr, 1000)) {
-			return Call(addr.mq_id().data());
-		} else {
-			SetLastError(eNotFound, "remote not found.");
-			return false;
+
+		if (topic_query_cache_.Find(req.topic(), addr)) {
+			return SendTo(addr, req, cb);
 		}
+
+		MsgQueryTopic query;
+		query.set_topic(req.topic());
+		BHMsgHead head(InitMsgHead(GetType(query), proc_id()));
+		AddRoute(head, sock.id());
+
+		auto onQueryResult = [this, SendTo, req, cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
+			MsgQueryTopicReply rep;
+			if (head.type() == kMsgTypeQueryTopicReply && imsg.ParseBody(rep)) {
+				auto &addr = rep.address();
+				if (!addr.mq_id().empty()) {
+					topic_query_cache_.Update(req.topic(), addr);
+					SendTo(addr, req, cb);
+				}
+			}
+		};
+		return sock.Send(&BHTopicCenterAddress(), head, query, onQueryResult);
+
 	} catch (...) {
 		return false;
 	}
@@ -257,6 +304,8 @@
 
 bool TopicNode::ClientSyncRequest(const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms)
 {
+	if (!IsRegistered()) { return false; }
+
 	try {
 		auto &sock = SockRequest();
 
@@ -264,6 +313,7 @@
 		if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
 			BHMsgHead head(InitMsgHead(GetType(request), proc_id()));
 			AddRoute(head, sock.id());
+			head.set_topic(request.topic());
 
 			MsgI reply_msg;
 			DEFER1(reply_msg.Release(shm_););
@@ -288,6 +338,8 @@
 
 bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
 {
+	if (!IsRegistered()) { return false; }
+
 	auto &sock = SockRequest();
 
 	if (topic_query_cache_.Find(topic, addr)) {
@@ -325,6 +377,8 @@
 
 bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms)
 {
+	if (!IsRegistered()) { return false; }
+
 	try {
 		auto &sock = SockPub();
 		BHMsgHead head(InitMsgHead(GetType(pub), proc_id()));
@@ -349,8 +403,10 @@
 
 // subscribe
 
-bool TopicNode::Subscribe(MsgTopicList &topics, const int timeout_ms)
+bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
 {
+	if (!IsRegistered()) { return false; }
+
 	try {
 		auto &sock = SockSub();
 		MsgSubscribe sub;
@@ -364,7 +420,6 @@
 			MsgI reply;
 			DEFER1(reply.Release(shm()););
 			BHMsgHead reply_head;
-			MsgCommonReply reply_body;
 			return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) &&
 			       reply_head.type() == kMsgTypeCommonReply &&
 			       reply.ParseBody(reply_body) &&
@@ -396,6 +451,8 @@
 
 bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms)
 {
+	if (!IsRegistered()) { return false; }
+
 	auto &sock = SockSub();
 	MsgI msg;
 	DEFER1(msg.Release(shm()););
diff --git a/src/topic_node.h b/src/topic_node.h
index 0627930..8c3c48e 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -29,7 +29,7 @@
 class TopicNode
 {
 	SharedMemory &shm_;
-	MsgRegister info_;
+	ProcInfo info_;
 
 	SharedMemory &shm() { return shm_; }
 
@@ -51,9 +51,9 @@
 	bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply);
 
 	// topic client
-	typedef std::function<void(const std::string &proc_id, const MsgRequestTopicReply &reply)> RequestResultCB;
+	typedef std::function<void(const BHMsgHead &head, const MsgRequestTopicReply &reply)> RequestResultCB;
 	bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2);
-	bool ClientAsyncRequest(const MsgRequestTopic &request, const RequestResultCB &rrcb = RequestResultCB());
+	bool ClientAsyncRequest(const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB());
 	bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms);
 
 	// publish
@@ -62,15 +62,15 @@
 	// subscribe
 	typedef std::function<void(const std::string &proc_id, const MsgPublish &data)> SubDataCB;
 	bool SubscribeStartWorker(const SubDataCB &tdcb, int nworker = 2);
-	bool Subscribe(MsgTopicList &topics, const int timeout_ms);
+	bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms);
 	bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms);
 
-	void Start(ServerCB const &server_cb, SubDataCB const &sub_cb);
+	void Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2);
 	void Stop();
 
 private:
 	bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
-	const std::string &proc_id() { return info_.proc().proc_id(); }
+	const std::string &proc_id() { return info_.proc_id(); }
 
 	typedef bhome_msg::BHAddress Address;
 	class TopicQueryCache
@@ -118,7 +118,9 @@
 	auto &SockClient() { return SockRequest(); }
 	auto &SockReply() { return sock_reply_; }
 	auto &SockServer() { return SockReply(); }
+	bool IsRegistered() const { return registered_.load(); }
 
+	std::atomic<bool> registered_;
 	ShmSocket sock_node_;
 	ShmSocket sock_request_;
 	ShmSocket sock_reply_;
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 113bb99..cff2cc5 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -17,11 +17,73 @@
  */
 #include "bh_api.h"
 #include "util.h"
+#include <atomic>
 
-class DemoClient
+using namespace bhome::msg;
+
+namespace
 {
-public:
+typedef std::atomic<uint64_t> Number;
+
+struct MsgStatus {
+	Number nrequest_;
+	Number nreply_;
+	Number nserved_;
+	MsgStatus() :
+	    nrequest_(0), nreply_(0), nserved_(0) {}
 };
+
+MsgStatus &Status()
+{
+	static MsgStatus st;
+	return st;
+}
+} // namespace
+
+void SubRecvProc(const void *proc_id,
+                 const int proc_id_len,
+                 const void *data,
+                 const int data_len)
+{
+	std::string proc((const char *) proc_id, proc_id_len);
+	MsgPublish pub;
+	pub.ParseFromArray(data, data_len);
+	// printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str());
+}
+
+void ServerProc(const void *proc_id,
+                const int proc_id_len,
+                const void *data,
+                const int data_len,
+                BHServerCallbackTag *tag)
+{
+	// printf("ServerProc: ");
+	// DEFER1(printf("\n"););
+	MsgRequestTopic request;
+	if (request.ParseFromArray(data, data_len)) {
+		MsgRequestTopicReply reply;
+		reply.set_data(" reply: " + request.data());
+		std::string s(reply.SerializeAsString());
+		// printf("%s", reply.data().c_str());
+		BHServerCallbackReply(tag, s.data(), s.size());
+		++Status().nserved_;
+	}
+}
+
+void ClientProc(const void *proc_id,
+                const int proc_id_len,
+                const void *msg_id,
+                const int msg_id_len,
+                const void *data,
+                const int data_len)
+{
+	std::string proc((const char *) proc_id, proc_id_len);
+	MsgRequestTopicReply reply;
+	if (reply.ParseFromArray(data, data_len)) {
+		++Status().nreply_;
+	}
+	// printf("client Recv reply : %s\n", reply.data().c_str());
+}
 
 BOOST_AUTO_TEST_CASE(ApiTest)
 {
@@ -36,19 +98,125 @@
 	       nsec, nhour, nday, years);
 	std::chrono::steady_clock::duration a(123456);
 	printf("nowsec: %ld\n", NowSec());
-	// for (int i = 0; i < 5; ++i) {
-	// 	std::this_thread::sleep_for(1s);
-	// 	printf("nowsec: %ld\n", NowSec());
-	// }
 
 	printf("maxsec: %ld\n", CountSeconds(max_time));
 
-	ProcInfo proc;
-	proc.set_proc_id("demo_client");
-	proc.set_public_info("public info of demo_client. etc...");
-	std::string proc_buf(proc.SerializeAsString());
-	void *reply = 0;
-	int reply_len = 0;
-	bool r = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 1000);
-	printf("register %s\n", r ? "ok" : "failed");
+	bool reg = false;
+	for (int i = 0; i < 10 && !reg; ++i) {
+		ProcInfo proc;
+		proc.set_proc_id("demo_client");
+		proc.set_public_info("public info of demo_client. etc...");
+		std::string proc_buf(proc.SerializeAsString());
+		void *reply = 0;
+		int reply_len = 0;
+		reg = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000);
+		printf("register %s\n", reg ? "ok" : "failed");
+
+		BHFree(reply, reply_len);
+		Sleep(1s);
+	}
+
+	const std::string topic_ = "topic_";
+
+	{
+		MsgTopicList topics;
+		for (int i = 0; i < 10; ++i) {
+			topics.add_topic_list(topic_ + std::to_string(i));
+		}
+		std::string s = topics.SerializeAsString();
+		void *reply = 0;
+		int reply_len = 0;
+		bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
+		BHFree(reply, reply_len);
+		// printf("register topic : %s\n", r ? "ok" : "failed");
+		Sleep(1s);
+	}
+
+	{
+		MsgTopicList topics;
+		for (int i = 0; i < 10; ++i) {
+			topics.add_topic_list(topic_ + std::to_string(i * 2));
+		}
+		std::string s = topics.SerializeAsString();
+		void *reply = 0;
+		int reply_len = 0;
+		bool r = BHSubscribeTopics(s.data(), s.size(), &reply, &reply_len, 1000);
+		BHFree(reply, reply_len);
+		printf("subscribe topic : %s\n", r ? "ok" : "failed");
+	}
+
+	BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
+
+	{
+		for (int i = 0; i < 1; ++i) {
+			MsgPublish pub;
+			pub.set_topic(topic_ + std::to_string(i));
+			pub.set_data("pub_data_" + std::string(1024 * 1024, 'a'));
+			std::string s(pub.SerializeAsString());
+			BHPublish(s.data(), s.size(), 0);
+			// Sleep(1s);
+		}
+	}
+
+	auto asyncRequest = [&](uint64_t nreq) {
+		for (uint64_t i = 0; i < nreq; ++i) {
+			MsgRequestTopic req;
+			req.set_topic(topic_ + std::to_string(0));
+			req.set_data("request_data_" + std::to_string(i));
+			std::string s(req.SerializeAsString());
+			void *msg_id = 0;
+			int len = 0;
+			bool r = BHAsyncRequest(s.data(), s.size(), 0, 0);
+			DEFER1(BHFree(msg_id, len););
+			if (r) {
+				++Status().nrequest_;
+			} else {
+				printf("request topic : %s\n", r ? "ok" : "failed");
+			}
+		}
+	};
+	auto showStatus = [](std::atomic<bool> *run) {
+		int64_t last = 0;
+		while (*run) {
+			auto &st = Status();
+			std::this_thread::sleep_for(1s);
+			int cur = st.nreply_.load();
+			printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld, speed %8ld\n", st.nrequest_.load(), st.nserved_.load(), cur, cur - last);
+			last = cur;
+		}
+	};
+	auto hb = [](std::atomic<bool> *run) {
+		while (*run) {
+			BHHeartBeatEasy(0);
+			std::this_thread::sleep_for(1s);
+		}
+	};
+	std::atomic<bool> run(true);
+	ThreadManager threads;
+	boost::timer::auto_cpu_timer timer;
+	threads.Launch(hb, &run);
+	// threads.Launch(showStatus, &run);
+	int ncli = 10;
+	const uint64_t nreq = 1000 * 100;
+	for (int i = 0; i < ncli; ++i) {
+		threads.Launch(asyncRequest, nreq);
+	}
+
+	int same = 0;
+	int64_t last = 0;
+	while (last < nreq * ncli && same < 3) {
+		Sleep(1s);
+		auto cur = Status().nreply_.load();
+		if (last == cur) {
+			++same;
+		} else {
+			last = cur;
+			same = 0;
+		}
+	}
+
+	run = false;
+	threads.WaitAll();
+	auto &st = Status();
+	printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld\n", st.nrequest_.load(), st.nserved_.load(), st.nreply_.load());
 }
\ No newline at end of file
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 817cbaf..12d4396 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -99,7 +99,7 @@
 	BHCenter center(shm);
 	center.Start();
 
-	std::this_thread::sleep_for(100ms);
+	Sleep(100ms);
 
 	std::atomic<uint64_t> total_count(0);
 	std::atomic<ptime> last_time(Now() - seconds(1));
@@ -113,7 +113,8 @@
 		for (auto &t : topics) {
 			tlist.add_topic_list(t);
 		}
-		bool r = client.Subscribe(tlist, timeout);
+		MsgCommonReply reply_body;
+		bool r = client.Subscribe(tlist, reply_body, timeout);
 		if (!r) {
 			printf("client subscribe failed.\n");
 		}
@@ -149,7 +150,7 @@
 			MsgPublish pub;
 			pub.set_topic(topic);
 			pub.set_data(data);
-			bool r = provider.Publish(pub, timeout);
+			bool r = provider.Publish(pub, 0);
 			if (!r) {
 				static std::atomic<int> an(0);
 				int n = ++an;
@@ -169,7 +170,7 @@
 		part.push_back(topics[i]);
 		threads.Launch(Sub, i, topics);
 	}
-	std::this_thread::sleep_for(100ms);
+	Sleep(100ms);
 	for (auto &topic : topics) {
 		threads.Launch(Pub, topic);
 	}
@@ -217,7 +218,7 @@
 
 		std::atomic<int> count(0);
 		std::string reply;
-		auto onRecv = [&](const std::string &proc_id, const MsgRequestTopicReply &msg) {
+		auto onRecv = [&](const BHMsgHead &head, const MsgRequestTopicReply &msg) {
 			reply = msg.data();
 			if (++count >= nreq) {
 				printf("count: %d\n", count.load());
@@ -229,7 +230,8 @@
 			MsgRequestTopic req;
 			req.set_topic(topic);
 			req.set_data("data " + std::to_string(i));
-			if (!client.ClientAsyncRequest(req)) {
+			std::string msg_id;
+			if (!client.ClientAsyncRequest(req, msg_id)) {
 				printf("client request failed\n");
 				++count;
 			}
@@ -274,9 +276,9 @@
 	ThreadManager clients, servers;
 	std::vector<Topic> topics = {"topic1", "topic2"};
 	servers.Launch(Server, "server", topics);
-	std::this_thread::sleep_for(100ms);
+	Sleep(100ms);
 	for (auto &t : topics) {
-		clients.Launch(Client, t, 1000 * 1);
+		clients.Launch(Client, t, 1000 * 100);
 	}
 	clients.WaitAll();
 	printf("clients done, server replyed: %ld\n", server_msg_count.load());
@@ -302,18 +304,16 @@
 		};
 		Check();
 		for (int i = 0; i < 3; ++i) {
-			std::this_thread::sleep_for(1s);
+			Sleep(1s);
 			Check();
 		}
-		printf("sleep 4\n");
-		std::this_thread::sleep_for(4s);
+		Sleep(4s);
 		for (int i = 0; i < 2; ++i) {
-			std::this_thread::sleep_for(1s);
+			Sleep(1s);
 			Check();
 		}
 	}
-	printf("sleep 8\n");
-	std::this_thread::sleep_for(8s);
+	Sleep(8s);
 }
 inline int MyMin(int a, int b)
 {
diff --git a/utest/util.h b/utest/util.h
index aaa5189..7f41da9 100644
--- a/utest/util.h
+++ b/utest/util.h
@@ -38,6 +38,13 @@
 
 using namespace std::chrono_literals;
 
+template <class D>
+inline void Sleep(D d)
+{
+	printf("sleep for %ld ms\n", std::chrono::duration_cast<std::chrono::milliseconds>(d).count());
+	std::this_thread::sleep_for(d);
+}
+
 typedef std::function<void(void)> FuncVV;
 
 class ScopeCall : private boost::noncopyable

--
Gitblit v1.8.0