From c338820e4db43ad32c20ff429a038b06bcb980f8 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 08 四月 2021 18:13:25 +0800
Subject: [PATCH] BIG change, join center,bus; now msg is head+body.

---
 .vscode/tasks.json               |    4 
 .gitignore                       |    4 
 src/socket.cpp                   |   73 +
 utest/utest.cpp                  |   75 +
 src/topic_node.cpp               |  322 ++++++++++
 src/center.cpp                   |  402 ++++++++++++
 src/msg.cpp                      |  145 ---
 utest/speed_test.cpp             |   68 +
 proto/source/bhome_msg_api.proto |   71 ++
 src/center.h                     |    3 
 src/proto.cpp                    |   41 +
 src/socket.h                     |   97 ++
 utest/simple_tests.cpp           |   10 
 src/proto.h                      |   78 ++
 .vscode/settings.json            |    7 
 src/msg.h                        |   69 +
 proto/source/bhome_msg.proto     |   84 --
 proto/source/error_msg.proto     |    5 
 src/shm_queue.cpp                |   19 
 /dev/null                        |  108 ---
 .vscode/launch.json              |    2 
 src/pubsub.h                     |   12 
 src/shm_queue.h                  |   10 
 src/pubsub.cpp                   |   56 +
 src/topic_node.h                 |  121 +++
 25 files changed, 1,406 insertions(+), 480 deletions(-)

diff --git a/.gitignore b/.gitignore
index 5c7daa3..8e81403 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,8 @@
 *.un~
 build/
+debug/
+release/
 Makefile
 utest/utest
+*.bak
+gmon.out
diff --git a/.vscode/launch.json b/.vscode/launch.json
index 9eeb23e..ef42f7b 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -8,7 +8,7 @@
             "name": "g++ - Build and debug active file",
             "type": "cppdbg",
             "request": "launch",
-            "program": "${workspaceFolder}/utest/utest",
+            "program": "${workspaceFolder}/debug/bin/utest",
             "args": [
                 "-t",
                 "ReqRepTest"
diff --git a/.vscode/settings.json b/.vscode/settings.json
index d5005e9..7928bc8 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -55,7 +55,12 @@
         "cinttypes": "cpp",
         "typeindex": "cpp",
         "typeinfo": "cpp",
-        "variant": "cpp"
+        "variant": "cpp",
+        "iomanip": "cpp",
+        "*.inc": "cpp",
+        "strstream": "cpp",
+        "unordered_set": "cpp",
+        "cfenv": "cpp"
     },
     "files.exclude": {
         "**/*.un~": true
diff --git a/.vscode/tasks.json b/.vscode/tasks.json
index 84142bc..db457e5 100644
--- a/.vscode/tasks.json
+++ b/.vscode/tasks.json
@@ -6,10 +6,10 @@
             "command": "ninja",
             "args": [
                 "-C",
-                "../build"
+                "debug"
             ],
             "options": {
-                "cwd": "${workspaceFolder}/utest"
+                "cwd": "${workspaceFolder}"
             },
             "problemMatcher": [
                 "$gcc"
diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index 9827f17..b06b692 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -1,39 +1,21 @@
 syntax = "proto3";
-
 option optimize_for = LITE_RUNTIME;
 
-import "google/protobuf/descriptor.proto";
-import "error_msg.proto";
+// import "google/protobuf/descriptor.proto";
+import "bhome_msg_api.proto";
 
 package bhome.msg;
 
 
-// message format : header(BHMsgHead) + body(variable types)
-message BHAddress {
-	bytes mq_id = 1; // mqid, uuid
-	bytes ip = 2;   //
-	int32 port = 3;
-}
-
-message ProcInfo
-{
-	bytes id = 1; // serial number, maybe managed
-	bytes name = 2;
-	bytes public_info = 3;
-	bytes private_info = 4;
-}
+// message format : head_len(4) + head(BHMsgHead) + body_len(4) + body(variable types)
 
 message BHMsgHead {
 	bytes msg_id = 1;
 	repeated BHAddress route = 2; // for reply and proxy.
 	int64 timestamp = 3;
 	int32 type = 4;
-	ProcInfo proc = 5;
+	bytes proc_id = 5;
 	bytes topic = 6; // for request route
-}
-
-message BHMsgBody {
-	bytes data = 1;
 }
 
 message BHMsg { // deprecated
@@ -46,6 +28,7 @@
 
 enum MsgType {
 	kMsgTypeInvalid = 0;
+	kMsgTypeRawData = 1;
 
 	kMsgTypeCommonReply = 2;
 
@@ -57,57 +40,16 @@
 	kMsgTypeQueryTopicReply = 15;
 	kMsgTypeRequestTopic = 16;
 	kMsgTypeRequestTopicReply = 17;
+	kMsgTypeRegisterRPC = 18;
+	// reply
 
-	kMsgTypePublish = 100;
-	// kMsgTypePublishReply = 101;
-	kMsgTypeSubscribe = 102;
-	// kMsgTypeSubscribeReply = 103;
-	kMsgTypeUnsubscribe = 104;
-	// kMsgTypeUnsubscribeReply = 105;
+	kMsgTypePublish = 20;
+	// kMsgTypePublishReply = 21;
+	kMsgTypeSubscribe = 22;
+	// kMsgTypeSubscribeReply = 23;
+	kMsgTypeUnsubscribe = 24;
+	// kMsgTypeUnsubscribeReply = 25;
 
-}
-
-message MsgPub {
-	bytes topic = 1;
-	bytes data = 2; 
-}
-
-message MsgSub {
-	repeated bytes topics = 1;
-}
-
-message MsgCommonReply {
-	ErrorMsg errmsg = 1;
-}
-
-message MsgRequestTopic {
-	bytes topic = 1;
-	bytes data = 2; 
-}
-
-message MsgRequestTopicReply {
-	ErrorMsg errmsg = 1;
-	bytes data = 2; 
-}
-
-message MsgRegister
-{
-	ProcInfo proc = 1;
-	repeated bytes topics = 2;
-}
-
-message MsgHeartbeat
-{
-	ProcInfo proc = 1;
-}
-
-message MsgQueryTopic {
-	bytes topic = 1;
-}
-
-message MsgQueryTopicReply {
-	ErrorMsg errmsg = 1;
-	BHAddress address = 2;
 }
 
 service TopicRPC {
diff --git a/proto/source/bhome_msg_api.proto b/proto/source/bhome_msg_api.proto
new file mode 100644
index 0000000..82b8115
--- /dev/null
+++ b/proto/source/bhome_msg_api.proto
@@ -0,0 +1,71 @@
+syntax = "proto3";
+option optimize_for = LITE_RUNTIME;
+
+// public messages
+import "error_msg.proto";
+
+package bhome.msg;
+
+message BHAddress {
+	bytes mq_id = 1; // mqid, uuid
+	// bytes ip = 2;   //
+	// int32 port = 3;
+}
+
+message ProcInfo
+{
+	bytes proc_id = 1; // serial number, maybe managed
+	bytes name = 2;
+	bytes public_info = 3; // maybe json.
+	bytes private_info = 4; 
+}
+
+message MsgPublish {
+	bytes topic = 1;
+	bytes data = 2; 
+}
+
+message MsgSubscribe {
+	repeated bytes topics = 1;
+}
+message MsgUnsubscribe {
+	repeated bytes topics = 1;
+}
+
+message MsgCommonReply {
+	ErrorMsg errmsg = 1;
+}
+
+message MsgRequestTopic {
+	bytes topic = 1;
+	bytes data = 2; 
+}
+
+message MsgRequestTopicReply {
+	ErrorMsg errmsg = 1;
+	bytes data = 2; 
+}
+
+message MsgRegister
+{
+	ProcInfo proc = 1;
+}
+
+message MsgRegisterRPC
+{
+	repeated bytes topics = 1;
+}
+
+message MsgHeartbeat
+{
+	ProcInfo proc = 1;
+}
+
+message MsgQueryTopic {
+	bytes topic = 1;
+}
+
+message MsgQueryTopicReply {
+	ErrorMsg errmsg = 1;
+	BHAddress address = 2;
+}
diff --git a/proto/source/error_msg.proto b/proto/source/error_msg.proto
index f283108..b85ddb3 100644
--- a/proto/source/error_msg.proto
+++ b/proto/source/error_msg.proto
@@ -8,6 +8,11 @@
     eSuccess = 0;
     eError = 1;
     eInvalidInput = 2;
+    eNotRegistered = 3;
+    eNotFound = 4;
+    eOffline = 5;
+    eNoRespond = 6;
+    eAddressNotMatch = 7;
 }
 
 message ErrorMsg {
diff --git a/src/center.cpp b/src/center.cpp
index a3897fb..fe549b7 100644
--- a/src/center.cpp
+++ b/src/center.cpp
@@ -16,20 +16,387 @@
  * =====================================================================================
  */
 #include "center.h"
+#include "bh_util.h"
 #include "defs.h"
-#include "pubsub_center.h"
-#include "reqrep_center.h"
 #include "shm.h"
+#include <set>
 
 using namespace bhome_shm;
+using namespace bhome_msg;
+using namespace bhome::msg;
 typedef BHCenter::MsgHandler Handler;
 
-Handler Join(Handler h1, Handler h2)
+namespace
 {
-	return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &imsg, bhome::msg::BHMsg &msg) {
-		return h1(socket, imsg, msg) || h2(socket, imsg, msg);
+auto Now = []() { time_t t; return time(&t); };
+
+//TODO check proc_id
+class NodeCenter
+{
+public:
+	typedef std::string ProcId;
+	typedef std::string Address;
+	typedef bhome::msg::ProcInfo ProcInfo;
+
+private:
+	enum {
+		kStateInvalid = 0,
+		kStateNormal = 1,
+		kStateNoRespond = 2,
+		kStateOffline = 3,
+	};
+
+	struct ProcState {
+		time_t timestamp_ = 0;
+		uint32_t flag_ = 0; // reserved
+	};
+	typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
+
+	struct NodeInfo {
+		ProcState state_;             // state
+		Address addr_;                // registered_mqid.
+		ProcInfo proc_;               //
+		AddressTopics services_;      // address: topics
+		AddressTopics subscriptions_; // address: topics
+	};
+	typedef std::shared_ptr<NodeInfo> Node;
+	typedef std::weak_ptr<NodeInfo> WeakNode;
+
+	struct TopicDest {
+		Address mq_;
+		WeakNode weak_node_;
+		bool operator<(const TopicDest &a) const { return mq_ < a.mq_; }
+	};
+	const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
+
+public:
+	typedef std::set<TopicDest> Clients;
+
+	NodeCenter(const std::string &id = "#Center") :
+	    id_(id) {}
+	const std::string &id() const { return id_; } // no need to lock.
+
+	//TODO maybe just return serialized string.
+	MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
+	{
+		if (msg.proc().proc_id() != head.proc_id()) {
+			return MakeReply(eInvalidInput, "invalid proc id.");
+		}
+
+		try {
+			Node node(new NodeInfo);
+			node->addr_ = SrcAddr(head);
+			node->proc_.Swap(msg.mutable_proc());
+			node->state_.timestamp_ = Now();
+			node->state_.flag_ = kStateNormal;
+			nodes_[node->proc_.proc_id()] = node;
+			return MakeReply(eSuccess);
+		} catch (...) {
+			return MakeReply(eError, "register node error.");
+		}
+	}
+	template <class OnSuccess, class OnError>
+	auto HandleMsg(const BHMsgHead &head, OnSuccess onOk, OnError onErr)
+	{
+		auto pos = nodes_.find(head.proc_id());
+		if (pos == nodes_.end()) {
+			return onErr(eNotRegistered, "Node is not registered.");
+		} else {
+			auto node = pos->second;
+			if (head.type() == kMsgTypeHeartbeat && node->addr_ != SrcAddr(head)) {
+				return onErr(eAddressNotMatch, "Node address error.");
+			} else if (!Valid(*node)) {
+				return onErr(eNoRespond, "Node is not alive.");
+			} else {
+				return onOk(node);
+			}
+		}
+	}
+
+	template <class Reply, class Func>
+	Reply HandleMsg(const BHMsgHead &head, Func const &op)
+	{
+		try {
+			auto onErr = [](const ErrorCode ec, const std::string &str) { return MakeReply<Reply>(ec, str); };
+			return HandleMsg(head, op, onErr);
+
+			auto pos = nodes_.find(head.proc_id());
+			if (pos == nodes_.end()) {
+				return MakeReply<Reply>(eNotRegistered, "Node is not registered.");
+			} else {
+				auto node = pos->second;
+				if (node->addr_ != SrcAddr(head)) {
+					return MakeReply<Reply>(eAddressNotMatch, "Node address error.");
+				} else if (!Valid(*node)) {
+					return MakeReply<Reply>(eNoRespond, "Node is not alive.");
+				} else {
+					return op(node);
+				}
+			}
+		} catch (...) {
+			//TODO error log
+			return MakeReply<Reply>(eError, "internal error.");
+		}
+	}
+	template <class Func>
+	inline MsgCommonReply HandleMsg(const BHMsgHead &head, Func const &op)
+	{
+		return HandleMsg<MsgCommonReply, Func>(head, op);
+	}
+
+	MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg)
+	{
+		return HandleMsg(
+		    head, [&](Node node) -> MsgCommonReply {
+			    auto &src = SrcAddr(head);
+			    node->services_[src].insert(msg.topics().begin(), msg.topics().end());
+			    TopicDest dest = {src, node};
+			    for (auto &topic : msg.topics()) {
+				    service_map_[topic].insert(dest);
+			    }
+			    return MakeReply(eSuccess);
+		    });
+	}
+
+	MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg)
+	{
+		return HandleMsg(head, [&](Node node) {
+			NodeInfo &ni = *node;
+			ni.state_.timestamp_ = Now();
+			auto &info = msg.proc();
+			if (!info.public_info().empty()) {
+				ni.proc_.set_public_info(info.public_info());
+			}
+			if (!info.private_info().empty()) {
+				ni.proc_.set_private_info(info.private_info());
+			}
+			return MakeReply(eSuccess);
+		});
+	}
+
+	MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req)
+	{
+		typedef MsgQueryTopicReply Reply;
+
+		auto query = [&](Node self) -> MsgQueryTopicReply {
+			auto pos = service_map_.find(req.topic());
+			if (pos != service_map_.end() && !pos->second.empty()) {
+				// now just find first one.
+				const TopicDest &dest = *(pos->second.begin());
+				Node dest_node(dest.weak_node_.lock());
+				if (!dest_node) {
+					service_map_.erase(pos);
+					return MakeReply<Reply>(eOffline, "topic server offline.");
+				} else if (!Valid(*dest_node)) {
+					return MakeReply<Reply>(eNoRespond, "topic server not responding.");
+				} else {
+					MsgQueryTopicReply reply = MakeReply<Reply>(eSuccess);
+					reply.mutable_address()->set_mq_id(dest.mq_);
+					return reply;
+				}
+
+			} else {
+				return MakeReply<Reply>(eNotFound, "topic server not found.");
+			}
+		};
+
+		return HandleMsg<Reply>(head, query);
+	}
+
+	MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg)
+	{
+		return HandleMsg(head, [&](Node node) {
+			auto &src = SrcAddr(head);
+			node->subscriptions_[src].insert(msg.topics().begin(), msg.topics().end());
+			TopicDest dest = {src, node};
+			for (auto &topic : msg.topics()) {
+				subscribe_map_[topic].insert(dest);
+			}
+			return MakeReply(eSuccess);
+		});
+	}
+	MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg)
+	{
+		return HandleMsg(head, [&](Node node) {
+			auto &src = SrcAddr(head);
+			auto pos = node->subscriptions_.find(src);
+
+			auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) {
+				auto pos = subscribe_map_.find(topic);
+				if (pos != subscribe_map_.end() &&
+				    pos->second.erase(dest) != 0 &&
+				    pos->second.empty()) {
+					subscribe_map_.erase(pos);
+				}
+			};
+
+			if (pos != node->subscriptions_.end()) {
+				const TopicDest &dest = {src, node};
+				// clear node sub records;
+				for (auto &topic : msg.topics()) {
+					pos->second.erase(topic);
+					RemoveSubTopicDestRecord(topic, dest);
+				}
+				if (pos->second.empty()) {
+					node->subscriptions_.erase(pos);
+				}
+			}
+			return MakeReply(eSuccess);
+		});
+	}
+
+	Clients DoFindClients(const std::string &topic)
+	{
+		Clients dests;
+		auto Find1 = [&](const std::string &t) {
+			auto pos = subscribe_map_.find(topic);
+			if (pos != subscribe_map_.end()) {
+				auto &clients = pos->second;
+				for (auto &cli : clients) {
+					if (Valid(cli.weak_node_)) {
+						dests.insert(cli);
+					}
+				}
+			}
+		};
+		Find1(topic);
+
+		size_t pos = 0;
+		while (true) {
+			pos = topic.find(kTopicSep, pos);
+			if (pos == topic.npos || ++pos == topic.size()) {
+				// Find1(std::string()); // sub all.
+				break;
+			} else {
+				Find1(topic.substr(0, pos));
+			}
+		}
+		return dests;
+	}
+	bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply)
+	{
+		bool ret = false;
+		HandleMsg(head, [&](Node node) {
+			DoFindClients(msg.topic()).swap(out);
+			ret = true;
+			return MakeReply(eSuccess);
+		}).Swap(&reply);
+		return ret;
+	}
+
+private:
+	bool Valid(const NodeInfo &node)
+	{
+		return node.state_.flag_ == kStateNormal;
+	}
+	bool Valid(const WeakNode &weak)
+	{
+		auto node = weak.lock();
+		return node && Valid(*node);
+	}
+	void CheckAllNodes(); //TODO, call it in timer.
+	std::string id_;      // center proc id;
+
+	std::unordered_map<Topic, Clients> service_map_;
+	std::unordered_map<Topic, Clients> subscribe_map_;
+	std::unordered_map<ProcId, Node> nodes_;
+};
+
+template <class Body, class OnMsg, class Replyer>
+inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer)
+{
+	if (head.route_size() != 1) { return; }
+	Body body;
+	if (msg.ParseBody(body)) {
+		replyer(onmsg(body));
+	}
+}
+
+Handler Combine(const Handler &h1, const Handler &h2)
+{
+	return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head) {
+		return h1(socket, msg, head) || h2(socket, msg, head);
 	};
 }
+template <class... H>
+Handler Combine(const Handler &h0, const Handler &h1, const Handler &h2, const H &...rest)
+{
+	return Combine(Combine(h0, h1), h2, rest...);
+}
+
+#define CASE_ON_MSG_TYPE(MsgTag)                                                         \
+	case kMsgType##MsgTag:                                                               \
+		Dispatch<Msg##MsgTag>(                                                           \
+		    msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \
+		return true;
+
+bool InstallCenter()
+{
+	auto center_ptr = std::make_shared<Synced<NodeCenter>>();
+	auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
+		return [&](auto &&rep_body) {
+			auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
+			bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 10);
+			if (!r) {
+				printf("send reply failed.\n");
+			}
+			//TODO resend failed.
+		};
+	};
+
+	auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
+		auto &center = *center_ptr;
+		auto replyer = MakeReplyer(socket, head, center->id());
+		switch (head.type()) {
+			CASE_ON_MSG_TYPE(Register);
+			CASE_ON_MSG_TYPE(Heartbeat);
+
+			CASE_ON_MSG_TYPE(RegisterRPC);
+			CASE_ON_MSG_TYPE(QueryTopic);
+		default: return false;
+		}
+	};
+
+	auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
+		auto &center = *center_ptr;
+		auto replyer = MakeReplyer(socket, head, center->id());
+		auto OnPublish = [&]() {
+			MsgPublish pub;
+			NodeCenter::Clients clients;
+			MsgCommonReply reply;
+			MsgI pubmsg;
+			if (head.route_size() != 1 || !msg.ParseBody(pub)) {
+				return;
+			} else if (!center->FindClients(head, pub, clients, reply)) {
+				// send error reply.
+				MakeReplyer(socket, head, center->id())(reply);
+			} else if (pubmsg.MakeRC(socket.shm(), msg)) {
+				DEFER1(pubmsg.Release(socket.shm()));
+				for (auto &cli : clients) {
+					auto node = cli.weak_node_.lock();
+					if (node) {
+						socket.Send(cli.mq_.data(), pubmsg, 10);
+					}
+				}
+			}
+		};
+		switch (head.type()) {
+			CASE_ON_MSG_TYPE(Subscribe);
+			CASE_ON_MSG_TYPE(Unsubscribe);
+		case kMsgTypePublish: OnPublish(); return true;
+		default: return false;
+		}
+	};
+
+	BHCenter::Install("#center.reg", OnCenter, BHTopicCenterAddress(), 1000);
+	BHCenter::Install("#center.bus", OnPubSub, BHTopicBusAddress(), 1000);
+
+	return true;
+}
+
+#undef CASE_ON_MSG_TYPE
+
+} // namespace
 
 SharedMemory &BHomeShm()
 {
@@ -42,17 +409,24 @@
 	static CenterRecords rec;
 	return rec;
 }
+
 bool BHCenter::Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len)
 {
-	CenterRecords()[name] = CenterInfo{name, handler, mqid, mq_len};
+	Centers()[name] = CenterInfo{name, handler, mqid, mq_len};
+	return true;
+}
+bool BHCenter::Install(const std::string &name, MsgHandler handler, const MQId &mqid, const int mq_len)
+{
+	return Install(name, handler, std::string((const char *) &mqid, sizeof(mqid)), mq_len);
 }
 
 BHCenter::BHCenter(Socket::Shm &shm)
 {
-	sockets_["center"] = std::make_shared<ShmSocket>(shm, &BHTopicCenterAddress(), 1000);
-	sockets_["bus"] = std::make_shared<ShmSocket>(shm, &BHTopicBusAddress(), 1000);
+	InstallCenter();
+
 	for (auto &kv : Centers()) {
-		sockets_[kv.first] = std::make_shared<ShmSocket>(shm, kv.second.mqid_.data(), kv.second.mq_len_);
+		auto &info = kv.second;
+		sockets_[info.name_] = std::make_shared<ShmSocket>(shm, *(MQId *) info.mqid_.data(), info.mq_len_);
 	}
 }
 
@@ -61,16 +435,12 @@
 
 bool BHCenter::Start()
 {
-	auto onCenter = MakeReqRepCenter();
-	auto onBus = MakeBusCenter();
-	sockets_["center"]->Start(onCenter);
-	sockets_["bus"]->Start(onBus);
-
 	for (auto &kv : Centers()) {
-		sockets_[kv.first]->Start(kv.second.handler_);
+		auto &info = kv.second;
+		sockets_[info.name_]->Start(info.handler_);
 	}
+
 	return true;
-	// socket_.Start(Join(onCenter, onBus));
 }
 
 bool BHCenter::Stop()
diff --git a/src/center.h b/src/center.h
index 02ec8f4..920addd 100644
--- a/src/center.h
+++ b/src/center.h
@@ -28,8 +28,9 @@
 	typedef ShmSocket Socket;
 
 public:
-	typedef std::function<bool(ShmSocket &socket, bhome_msg::MsgI &imsg, bhome::msg::BHMsg &msg)> MsgHandler;
+	typedef Socket::PartialRecvCB MsgHandler;
 	static bool Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len);
+	static bool Install(const std::string &name, MsgHandler handler, const MQId &mqid, const int mq_len);
 
 	BHCenter(Socket::Shm &shm);
 	BHCenter();
diff --git a/src/msg.cpp b/src/msg.cpp
index 8752066..c353d84 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -25,140 +25,38 @@
     center accept request and route.;
 //*/
 const uint32_t kMsgTag = 0xf1e2d3c4;
-const uint32_t kMsgPrefixLen = 4;
 
-inline void AddRoute(BHMsg &msg, const MQId &id) { msg.add_route()->set_mq_id(&id, sizeof(id)); }
-
-std::string RandId()
+void *MsgI::Pack(SharedMemory &shm,
+                 const uint32_t head_len, const ToArray &headToArray,
+                 const uint32_t body_len, const ToArray &bodyToArray)
 {
-	boost::uuids::uuid id = boost::uuids::random_generator()();
-	return std::string((char *) &id, sizeof(id));
-}
-BHMsg InitMsg(MsgType type, const std::string &msgid = RandId())
-{
-	BHMsg msg;
-	msg.set_msg_id(msgid);
-	msg.set_type(type);
-	time_t tm = 0;
-	msg.set_timestamp(time(&tm));
-	return msg;
-}
-
-BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size)
-{
-	BHMsg msg(InitMsg(kMsgTypeRequestTopic));
-	AddRoute(msg, src_id);
-	MsgRequestTopic req;
-	req.set_topic(topic);
-	req.set_data(data, size);
-	msg.set_body(req.SerializeAsString());
-	return msg;
-}
-
-BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics)
-{
-	BHMsg msg(InitMsg(kMsgTypeRegister));
-	AddRoute(msg, src_id);
-	MsgRegister reg;
-	reg.mutable_proc()->Swap(&info);
-	for (auto &t : topics) {
-		reg.add_topics(t);
+	void *addr = shm.Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len);
+	if (addr) {
+		auto p = static_cast<char *>(addr);
+		auto Pack1 = [&p](auto len, auto &writer) {
+			Put32(p, len);
+			p += sizeof(len);
+			writer(p, len);
+			p += len;
+		};
+		Pack1(head_len, headToArray);
+		Pack1(body_len, bodyToArray);
 	}
-	msg.set_body(reg.SerializeAsString());
-	return msg;
+	return addr;
 }
 
-BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info)
+bool MsgI::ParseHead(BHMsgHead &head) const
 {
-	BHMsg msg(InitMsg(kMsgTypeHeartbeat));
-	AddRoute(msg, src_id);
-	MsgHeartbeat reg;
-	reg.mutable_proc()->Swap(&info);
-	msg.set_body(reg.SerializeAsString());
-	return msg;
-}
-
-BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size)
-{
-	assert(data && size);
-	BHMsg msg(InitMsg(kMsgTypeRequestTopicReply, src_msgid));
-	MsgRequestTopicReply reply;
-	reply.set_data(data, size);
-	msg.set_body(reply.SerializeAsString());
-	return msg;
-}
-
-BHMsg MakeSubUnsub(const MQId &client, const std::vector<std::string> &topics, const MsgType sub_unsub)
-{
-	assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe);
-	BHMsg msg(InitMsg(sub_unsub));
-	AddRoute(msg, client);
-	MsgSub subs;
-	for (auto &t : topics) {
-		subs.add_topics(t);
-	}
-	msg.set_body(subs.SerializeAsString());
-	return msg;
-}
-
-BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics) { return MakeSubUnsub(client, topics, kMsgTypeSubscribe); }
-BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics) { return MakeSubUnsub(client, topics, kMsgTypeUnsubscribe); }
-
-BHMsg MakePub(const std::string &topic, const void *data, const size_t size)
-{
-	assert(data && size);
-	BHMsg msg(InitMsg(kMsgTypePublish));
-	MsgPub pub;
-	pub.set_topic(topic);
-	pub.set_data(data, size);
-	msg.set_body(pub.SerializeAsString());
-	return msg;
-}
-
-BHMsg MakeQueryTopic(const MQId &client, const std::string &topic)
-{
-	BHMsg msg(InitMsg(kMsgTypeQueryTopic));
-	AddRoute(msg, client);
-	MsgQueryTopic query;
-	query.set_topic(topic);
-	msg.set_body(query.SerializeAsString());
-	return msg;
-}
-BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid)
-{
-	BHMsg msg(InitMsg(kMsgTypeQueryTopicReply, msgid));
-	MsgQueryTopicReply reply;
-	reply.mutable_address()->set_mq_id(mqid);
-	msg.set_body(reply.SerializeAsString());
-	return msg;
-}
-
-void *Pack(SharedMemory &shm, const BHMsg &msg)
-{
-	uint32_t msg_size = msg.ByteSizeLong();
-	void *p = shm.Alloc(4 + msg_size);
-	if (p) {
-		Put32(p, msg_size);
-		if (!msg.SerializeToArray(static_cast<char *>(p) + kMsgPrefixLen, msg_size)) {
-			shm.Dealloc(p);
-			p = 0;
-		}
-	}
-	return p;
-}
-
-bool MsgI::Unpack(BHMsg &msg) const
-{
-	void *p = ptr_.get();
+	auto p = static_cast<char *>(ptr_.get());
 	assert(p);
 	uint32_t msg_size = Get32(p);
-	return msg.ParseFromArray(static_cast<char *>(p) + kMsgPrefixLen, msg_size);
+	p += 4;
+	return head.ParseFromArray(p, msg_size);
 }
 
 // with ref count;
-bool MsgI::MakeRC(SharedMemory &shm, const BHMsg &msg)
+bool MsgI::MakeRC(SharedMemory &shm, void *p)
 {
-	void *p = Pack(shm, msg);
 	if (!p) {
 		return false;
 	}
@@ -171,9 +69,8 @@
 	return true;
 }
 
-bool MsgI::Make(SharedMemory &shm, const BHMsg &msg)
+bool MsgI::Make(SharedMemory &shm, void *p)
 {
-	void *p = Pack(shm, msg);
 	if (!p) {
 		return false;
 	}
diff --git a/src/msg.h b/src/msg.h
index 30b3208..661d989 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -18,10 +18,12 @@
 #ifndef MSG_5BILLZET
 #define MSG_5BILLZET
 
-#include "bhome_msg.pb.h"
+#include "bh_util.h"
+#include "proto.h"
 #include "shm.h"
 #include <boost/interprocess/offset_ptr.hpp>
 #include <boost/uuid/uuid_generators.hpp>
+#include <functional>
 #include <stdint.h>
 
 namespace bhome_msg
@@ -59,16 +61,6 @@
 	int num_ = 1;
 };
 
-BHMsg MakeQueryTopic(const MQId &client, const std::string &topic);
-BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid);
-BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size);
-BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size);
-BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics);
-BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info);
-BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics);
-BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics);
-BHMsg MakePub(const std::string &topic, const void *data, const size_t size);
-
 // message content layout: header_size + header + data_size + data
 class MsgI
 {
@@ -76,7 +68,22 @@
 	offset_ptr<void> ptr_;
 	offset_ptr<RefCount> count_;
 
-	bool BuildSubOrUnsub(SharedMemory &shm, const std::vector<std::string> &topics, const MsgType sub_unsub);
+	typedef std::function<void(void *p, int len)> ToArray;
+	void *Pack(SharedMemory &shm,
+	           const uint32_t head_len, const ToArray &headToArray,
+	           const uint32_t body_len, const ToArray &bodyToArray);
+
+	template <class Body>
+	void *Pack(SharedMemory &shm, const BHMsgHead &head, const Body &body)
+	{
+		return Pack(
+		    shm,
+		    uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); },
+		    uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); });
+	}
+
+	bool MakeRC(SharedMemory &shm, void *addr);
+	bool Make(SharedMemory &shm, void *addr);
 
 public:
 	MsgI(void *p = 0, RefCount *c = 0) :
@@ -97,9 +104,41 @@
 	int Count() const { return IsCounted() ? count_->Get() : 1; }
 	bool IsCounted() const { return static_cast<bool>(count_); }
 
-	bool Make(SharedMemory &shm, const BHMsg &msg);
-	bool MakeRC(SharedMemory &shm, const BHMsg &msg);
-	bool Unpack(BHMsg &msg) const;
+	template <class Body>
+	bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body)
+	{
+		return Make(shm, Pack(shm, head, body));
+	}
+	template <class Body>
+	bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body)
+	{
+		return MakeRC(shm, Pack(shm, head, body));
+	}
+	bool MakeRC(SharedMemory &shm, MsgI &a)
+	{
+		if (a.IsCounted()) {
+			*this = a;
+			AddRef();
+			return true;
+		} else {
+			void *p = a.ptr_.get();
+			a.ptr_ = 0;
+			return MakeRC(shm, p);
+		}
+	}
+	bool ParseHead(BHMsgHead &head) const;
+	template <class Body>
+	bool ParseBody(Body &body) const
+	{
+		auto p = static_cast<char *>(ptr_.get());
+		assert(p);
+		uint32_t size = Get32(p);
+		p += 4;
+		p += size;
+		size = Get32(p);
+		p += 4;
+		return body.ParseFromArray(p, size);
+	}
 };
 
 inline void swap(MsgI &m1, MsgI &m2) { m1.swap(m2); }
diff --git a/src/proto.cpp b/src/proto.cpp
new file mode 100644
index 0000000..0ec894f
--- /dev/null
+++ b/src/proto.cpp
@@ -0,0 +1,41 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  proto.cpp
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�07鏃� 17鏃�04鍒�36绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#include "proto.h"
+#include <boost/uuid/uuid_generators.hpp>
+
+std::string RandId()
+{
+	boost::uuids::uuid id = boost::uuids::random_generator()();
+	return std::string((char *) &id, sizeof(id));
+}
+
+BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id)
+{
+	return InitMsgHead(type, proc_id, RandId());
+}
+
+BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid)
+{
+	BHMsgHead msg;
+	msg.set_msg_id(msgid);
+	msg.set_type(type);
+	msg.set_proc_id(proc_id);
+	time_t tm = 0;
+	msg.set_timestamp(time(&tm));
+	return msg;
+}
diff --git a/src/proto.h b/src/proto.h
new file mode 100644
index 0000000..2057711
--- /dev/null
+++ b/src/proto.h
@@ -0,0 +1,78 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  proto.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�07鏃� 13鏃�48鍒�51绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#ifndef PROTO_UA9UWKL1
+#define PROTO_UA9UWKL1
+
+#include "bhome_msg.pb.h"
+
+using namespace bhome::msg;
+
+template <class Msg>
+struct MsgToType {
+};
+
+#define BHOME_MAP_MSG_AND_TYPE(mSG, tYPE)              \
+	template <>                                        \
+	struct MsgToType<mSG> {                            \
+		static const bhome::msg::MsgType value = tYPE; \
+	};
+
+#define BHOME_SIMPLE_MAP_MSG(name) BHOME_MAP_MSG_AND_TYPE(Msg##name, kMsgType##name)
+
+BHOME_SIMPLE_MAP_MSG(CommonReply);
+BHOME_SIMPLE_MAP_MSG(Register);
+BHOME_SIMPLE_MAP_MSG(RegisterRPC);
+BHOME_SIMPLE_MAP_MSG(Heartbeat);
+BHOME_SIMPLE_MAP_MSG(QueryTopic);
+BHOME_SIMPLE_MAP_MSG(QueryTopicReply);
+BHOME_SIMPLE_MAP_MSG(RequestTopic);
+BHOME_SIMPLE_MAP_MSG(RequestTopicReply);
+BHOME_SIMPLE_MAP_MSG(Publish);
+BHOME_SIMPLE_MAP_MSG(Subscribe);
+BHOME_SIMPLE_MAP_MSG(Unsubscribe);
+
+#undef BHOME_SIMPLE_MAP_MSG
+#undef BHOME_MAP_MSG_AND_TYPE
+
+template <class Msg>
+constexpr inline bhome::msg::MsgType GetType(const Msg &)
+{
+	return MsgToType<Msg>::value;
+}
+
+inline void SetError(ErrorMsg &em, const ErrorCode err_code, const std::string &err_str = "")
+{
+	em.set_errcode(err_code);
+	if (!err_str.empty()) {
+		em.set_errstring(err_str);
+	}
+}
+
+template <class Reply = MsgCommonReply>
+inline Reply MakeReply(const ErrorCode err_code, const std::string &err_str = "")
+{
+	Reply msg;
+	SetError(*msg.mutable_errmsg(), err_code, err_str);
+	return msg;
+}
+
+BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid);
+BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id);
+// inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
+
+#endif // end of include guard: PROTO_UA9UWKL1
diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index 0266c86..471c63c 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -22,24 +22,38 @@
 using namespace std::chrono_literals;
 using namespace bhome_msg;
 
-bool SocketPublish::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms)
+bool SocketPublish::Publish(const std::string &proc_id, const Topic &topic, const void *data, const size_t size, const int timeout_ms)
 {
 	try {
+		MsgPublish pub;
+		pub.set_topic(topic);
+		pub.set_data(data, size);
+		BHMsgHead head(InitMsgHead(GetType(pub), proc_id));
 		MsgI imsg;
-		if (!imsg.MakeRC(shm(), MakePub(topic, data, size))) {
-			return false;
+		if (imsg.MakeRC(shm(), head, pub)) {
+			DEFER1(imsg.Release(shm()));
+			return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms);
 		}
-		DEFER1(imsg.Release(shm()));
-		return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms);
 	} catch (...) {
-		return false;
 	}
+	return false;
 }
+namespace
+{
+inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
 
-bool SocketSubscribe::Subscribe(const std::vector<Topic> &topics, const int timeout_ms)
+} // namespace
+bool SocketSubscribe::Subscribe(const std::string &proc_id, const std::vector<Topic> &topics, const int timeout_ms)
 {
 	try {
-		return mq().Send(BHTopicBusAddress(), MakeSub(mq().Id(), topics), timeout_ms);
+		MsgSubscribe sub;
+		for (auto &topic : topics) {
+			sub.add_topics(topic);
+		}
+		BHMsgHead head(InitMsgHead(GetType(sub), proc_id));
+		AddRoute(head, mq().Id());
+
+		return Send(&BHTopicBusAddress(), head, sub, timeout_ms);
 	} catch (...) {
 		return false;
 	}
@@ -47,11 +61,11 @@
 
 bool SocketSubscribe::StartRecv(const TopicDataCB &tdcb, int nworker)
 {
-	auto AsyncRecvProc = [this, tdcb](BHMsg &msg) {
-		if (msg.type() == kMsgTypePublish) {
-			MsgPub d;
-			if (d.ParseFromString(msg.body())) {
-				tdcb(d.topic(), d.data());
+	auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) {
+		if (head.type() == kMsgTypePublish) {
+			MsgPublish pub;
+			if (imsg.ParseBody(pub)) {
+				tdcb(head.proc_id(), pub.topic(), pub.data());
 			}
 		} else {
 			// ignored, or dropped
@@ -61,14 +75,16 @@
 	return tdcb && Start(AsyncRecvProc, nworker);
 }
 
-bool SocketSubscribe::RecvSub(Topic &topic, std::string &data, const int timeout_ms)
+bool SocketSubscribe::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms)
 {
-	BHMsg msg;
-	if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) {
-		MsgPub d;
-		if (d.ParseFromString(msg.body())) {
-			d.mutable_topic()->swap(topic);
-			d.mutable_data()->swap(data);
+	MsgI msg;
+	BHMsgHead head;
+	if (SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) {
+		MsgPublish pub;
+		if (msg.ParseBody(pub)) {
+			head.mutable_proc_id()->swap(proc_id);
+			pub.mutable_topic()->swap(topic);
+			pub.mutable_data()->swap(data);
 			return true;
 		}
 	}
diff --git a/src/pubsub.h b/src/pubsub.h
index 3c3d4ad..bd60fcd 100644
--- a/src/pubsub.h
+++ b/src/pubsub.h
@@ -33,11 +33,7 @@
 	    shm_(shm) {}
 	SocketPublish() :
 	    SocketPublish(BHomeShm()) {}
-	bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms);
-	bool Publish(const Topic &topic, const std::string &data, const int timeout_ms)
-	{
-		return Publish(topic, data.data(), data.size(), timeout_ms);
-	}
+	bool Publish(const std::string &proc_id, const Topic &topic, const void *data, const size_t size, const int timeout_ms);
 };
 
 // socket subscribe
@@ -52,11 +48,11 @@
 	    SocketSubscribe(BHomeShm()) {}
 	~SocketSubscribe() { Stop(); }
 
-	typedef std::function<void(const Topic &topic, const std::string &data)> TopicDataCB;
+	typedef std::function<void(const std::string &proc_id, const Topic &topic, const std::string &data)> TopicDataCB;
 	bool StartRecv(const TopicDataCB &tdcb, int nworker = 2);
 	bool Stop() { return Socket::Stop(); }
-	bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms);
-	bool RecvSub(Topic &topic, std::string &data, const int timeout_ms);
+	bool Subscribe(const std::string &proc_id, const std::vector<Topic> &topics, const int timeout_ms);
+	bool RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms);
 };
 
 #endif // end of include guard: PUBSUB_4KGRA997
diff --git a/src/pubsub_center.cpp b/src/pubsub_center.cpp
deleted file mode 100644
index 698327e..0000000
--- a/src/pubsub_center.cpp
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * =====================================================================================
- *
- *       Filename:  pubsub_center.cpp
- *
- *    Description:  pub/sub center/manager
- *
- *        Version:  1.0
- *        Created:  2021骞�04鏈�01鏃� 09鏃�29鍒�04绉�
- *       Revision:  none
- *       Compiler:  gcc
- *
- *         Author:  Li Chao (), 
- *   Organization:  
- *
- * =====================================================================================
- */
-#include "pubsub_center.h"
-#include "bh_util.h"
-using namespace bhome_shm;
-namespace
-{
-class BusCenter
-{
-	typedef std::set<MQId> Clients;
-	std::unordered_map<Topic, Clients> records_;
-	// todo cache data if send fail.
-
-public:
-	template <class Iter>
-	void SubScribe(const MQId &client, Iter topic_begin, Iter topic_end)
-	{
-		for (auto it = topic_begin; it != topic_end; ++it) {
-			records_[*it].insert(client);
-		}
-	}
-	template <class Iter>
-	void UnsubScribe(const MQId &client, Iter topic_begin, Iter topic_end)
-	{
-		for (auto it = topic_begin; it != topic_end; ++it) {
-			auto pos = records_.find(*it);
-			if (pos != records_.end()) {
-				if (pos->second.erase(client) && pos->second.empty()) {
-					records_.erase(pos);
-				}
-			}
-		}
-	};
-	Clients FindClients(const std::string &topic)
-	{
-		Clients dests;
-		auto Find1 = [&](const std::string &t) {
-			auto pos = records_.find(topic);
-			if (pos != records_.end() && !pos->second.empty()) {
-				auto &clients = pos->second;
-				for (auto &cli : clients) {
-					dests.insert(cli);
-				}
-			}
-		};
-		Find1(topic);
-
-		//TODO check and adjust topic on client side sub/pub.
-		size_t pos = 0;
-		while (true) {
-			pos = topic.find(kTopicSep, pos);
-			if (pos == topic.npos || ++pos == topic.size()) {
-				// Find1(std::string()); // sub all.
-				break;
-			} else {
-				Find1(topic.substr(0, pos));
-			}
-		}
-		return dests;
-	}
-};
-
-} // namespace
-
-BHCenter::MsgHandler MakeBusCenter()
-{
-	auto bus_ptr = std::make_shared<Synced<BusCenter>>();
-
-	return [bus_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) {
-#ifndef NDEBUG
-		static std::atomic<time_t> last(0);
-		time_t now = 0;
-		time(&now);
-		if (last.exchange(now) < now) {
-			printf("bus queue size: %ld\n", socket.Pending());
-		}
-#endif
-		auto &bus = *bus_ptr;
-		auto &shm = socket.shm();
-
-		auto OnSubChange = [&](auto &&update) {
-			MsgSub sub;
-			if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
-				assert(sizeof(MQId) == msg.route(0).mq_id().size());
-				MQId client;
-				memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
-				update(client, sub.topics());
-			}
-		};
-		auto Sub = [&](const MQId &id, auto &topics) { bus->SubScribe(id, topics.begin(), topics.end()); };
-		auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); };
-
-		auto OnPublish = [&]() {
-			MsgPub pub;
-			if (!pub.ParseFromString(msg.body())) {
-				return;
-			}
-			auto Dispatch = [&](auto &&send1) {
-				const auto &clients(bus->FindClients(pub.topic()));
-				for (auto &cli : clients) {
-					send1(cli);
-				}
-			};
-
-			if (imsg.IsCounted()) {
-				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, imsg, 10); });
-			} else {
-				MsgI pubmsg;
-				if (!pubmsg.MakeRC(shm, msg)) { return; }
-				DEFER1(pubmsg.Release(shm));
-
-				Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, pubmsg, 10); });
-			}
-		};
-
-		switch (msg.type()) {
-		case kMsgTypeSubscribe: OnSubChange(Sub); return true;
-		case kMsgTypeUnsubscribe: OnSubChange(Unsub); return true;
-		case kMsgTypePublish: OnPublish(); return true;
-		default: return false;
-		}
-	};
-}
-
-bool PubSubCenter::Start(const int nworker)
-{
-	auto handler = MakeBusCenter();
-	printf("sizeof(pub/sub handler) = %ld\n", sizeof(handler));
-
-	const int kMaxWorker = 16;
-	return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
-}
\ No newline at end of file
diff --git a/src/pubsub_center.h b/src/pubsub_center.h
deleted file mode 100644
index f81fa0e..0000000
--- a/src/pubsub_center.h
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * =====================================================================================
- *
- *       Filename:  pubsub_center.h
- *
- *    Description:  
- *
- *        Version:  1.0
- *        Created:  2021骞�04鏈�01鏃� 09鏃�29鍒�39绉�
- *       Revision:  none
- *       Compiler:  gcc
- *
- *         Author:  Li Chao (), 
- *   Organization:  
- *
- * =====================================================================================
- */
-#ifndef PUBSUB_CENTER_MFSUZJU7
-#define PUBSUB_CENTER_MFSUZJU7
-
-#include "center.h"
-#include "defs.h"
-#include "socket.h"
-#include <mutex>
-#include <set>
-#include <unordered_map>
-
-BHCenter::MsgHandler MakeBusCenter();
-
-// publish/subcribe manager.
-class PubSubCenter
-{
-	ShmSocket socket_;
-
-public:
-	PubSubCenter(ShmSocket::Shm &shm) :
-	    socket_(shm, &BHTopicBusAddress(), 1000) {}
-	PubSubCenter() :
-	    PubSubCenter(BHomeShm()) {}
-	~PubSubCenter() { Stop(); }
-	bool Start(const int nworker = 2);
-	bool Stop() { return socket_.Stop(); }
-};
-
-#endif // end of include guard: PUBSUB_CENTER_MFSUZJU7
diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp
deleted file mode 100644
index ce35d1c..0000000
--- a/src/reqrep_center.cpp
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * =====================================================================================
- *
- *       Filename:  reqrep_center.cpp
- *
- *    Description:  topic request/reply center
- *
- *        Version:  1.0
- *        Created:  2021骞�04鏈�01鏃� 14鏃�08鍒�50绉�
- *       Revision:  none
- *       Compiler:  gcc
- *
- *         Author:  Li Chao (), 
- *   Organization:  
- *
- * =====================================================================================
- */
-#include "reqrep_center.h"
-#include "bh_util.h"
-#include "msg.h"
-#include <chrono>
-#include <memory>
-#include <mutex>
-#include <unordered_map>
-
-using namespace bhome_shm;
-
-namespace
-{
-auto Now = []() { time_t t; return time(&t); };
-
-class NodeCenter
-{
-public:
-	typedef std::string ProcAddr;
-	typedef bhome::msg::ProcInfo ProcInfo;
-
-	template <class Iter>
-	bool Register(ProcInfo &info, const ProcAddr &src_mq, Iter topics_begin, Iter topics_end)
-	{
-		try {
-			Node node(new NodeInfo);
-			node->addr_ = src_mq;
-			node->proc_.Swap(&info);
-			node->state_.timestamp_ = Now();
-			nodes_[node->proc_.id()] = node;
-			for (auto it = topics_begin; it != topics_end; ++it) {
-				topic_map_[*it] = node;
-			}
-			return true;
-		} catch (...) {
-			return false;
-		}
-	}
-	void Heartbeat(ProcInfo &info, const ProcAddr &src_mq)
-	{
-		auto pos = nodes_.find(info.name());
-		if (pos != nodes_.end() && pos->second->addr_ == src_mq) { // both name and mq should be the same.
-			NodeInfo &ni = *pos->second;
-			ni.state_.timestamp_ = Now();
-			if (!info.public_info().empty()) {
-				ni.proc_.set_public_info(info.public_info());
-			}
-			if (!info.private_info().empty()) {
-				ni.proc_.set_private_info(info.private_info());
-			}
-		}
-	}
-	bool QueryTopic(const Topic &topic, ProcAddr &addr)
-	{
-		auto pos = topic_map_.find(topic);
-		if (pos != topic_map_.end()) {
-			Node node(pos->second.lock());
-			if (node) {
-				addr = node->addr_;
-				return true;
-			} else { // dead, remove record.
-				topic_map_.erase(pos);
-				return false;
-			}
-		} else {
-			return false;
-		}
-	}
-
-private:
-	struct ProcState {
-		time_t timestamp_ = 0;
-		uint32_t flag_ = 0; // reserved
-	};
-	typedef std::string ProcId;
-	struct NodeInfo {
-		ProcState state_; // state
-		ProcAddr addr_;   // registered_mqid.
-		ProcInfo proc_;   //
-	};
-	typedef std::shared_ptr<NodeInfo> Node;
-	typedef std::weak_ptr<NodeInfo> WeakNode;
-	std::unordered_map<Topic, WeakNode> topic_map_;
-	std::unordered_map<ProcId, Node> nodes_;
-};
-
-} // namespace
-
-BHCenter::MsgHandler MakeReqRepCenter()
-{
-	auto center_ptr = std::make_shared<Synced<NodeCenter>>();
-	return [center_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) {
-		auto &center = *center_ptr;
-		auto &shm = socket.shm();
-
-#ifndef NDEBUG
-		static std::atomic<time_t> last(0);
-		time_t now = 0;
-		time(&now);
-		if (last.exchange(now) < now) {
-			printf("center queue size: %ld\n", socket.Pending());
-		}
-#endif
-		auto SrcMQ = [&]() { return msg.route(0).mq_id(); };
-
-		auto OnRegister = [&]() {
-			if (msg.route_size() != 1) { return; }
-
-			MsgRegister reg;
-			if (reg.ParseFromString(msg.body()) && reg.has_proc()) {
-				center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end());
-			}
-		};
-
-		auto OnHeartbeat = [&]() {
-			if (msg.route_size() != 1) { return; }
-			auto &src_mq = msg.route(0).mq_id();
-
-			MsgHeartbeat hb;
-			if (hb.ParseFromString(msg.body()) && hb.has_proc()) {
-				center->Heartbeat(*hb.mutable_proc(), SrcMQ());
-			}
-		};
-
-		auto OnQueryTopic = [&]() {
-			if (msg.route_size() != 1) { return; }
-
-			MsgQueryTopic query;
-			NodeCenter::ProcAddr dest;
-			if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) {
-				MQId remote;
-				memcpy(&remote, SrcMQ().data(), sizeof(MQId));
-				MsgI imsg;
-				if (!imsg.Make(shm, MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
-				if (!ShmMsgQueue::Send(shm, remote, imsg, 100)) {
-					imsg.Release(shm);
-				}
-			}
-		};
-
-		switch (msg.type()) {
-		case kMsgTypeRegister: OnRegister(); return true;
-		case kMsgTypeHeartbeat: OnHeartbeat(); return true;
-		case kMsgTypeQueryTopic: OnQueryTopic(); return true;
-		default: return false;
-		}
-	};
-}
-
-bool ReqRepCenter::Start(const int nworker)
-{
-	auto handler = MakeReqRepCenter();
-	printf("sizeof(rep/req handler) = %ld\n", sizeof(handler));
-
-	const int kMaxWorker = 16;
-	return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
-}
diff --git a/src/reqrep_center.h b/src/reqrep_center.h
deleted file mode 100644
index bdcdcad..0000000
--- a/src/reqrep_center.h
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * =====================================================================================
- *
- *       Filename:  reqrep_center.h
- *
- *    Description:  
- *
- *        Version:  1.0
- *        Created:  2021骞�04鏈�01鏃� 14鏃�09鍒�13绉�
- *       Revision:  none
- *       Compiler:  gcc
- *
- *         Author:  Li Chao (), 
- *   Organization:  
- *
- * =====================================================================================
- */
-#ifndef REQREP_CENTER_US3RBM60
-#define REQREP_CENTER_US3RBM60
-
-#include "center.h"
-#include "defs.h"
-#include "socket.h"
-
-BHCenter::MsgHandler MakeReqRepCenter();
-class ReqRepCenter
-{
-	ShmSocket socket_;
-
-public:
-	ReqRepCenter(ShmSocket::Shm &shm) :
-	    socket_(shm, &BHTopicCenterAddress(), 1000) {}
-	ReqRepCenter() :
-	    ReqRepCenter(BHomeShm()) {}
-	~ReqRepCenter() { Stop(); }
-	bool Start(const int nworker = 2);
-	bool Stop() { return socket_.Stop(); }
-};
-
-#endif // end of include guard: REQREP_CENTER_US3RBM60
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index dcb5a9e..521f773 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -87,15 +87,14 @@
 // 2) find remote queue first, then build msg;
 // 1 is about 50% faster than 2, maybe cache related.
 
-bool ShmMsgQueue::Recv(BHMsg &msg, const int timeout_ms)
-{
-	MsgI imsg;
-	if (Read(imsg, timeout_ms)) {
-		DEFER1(imsg.Release(shm()););
-		return imsg.Unpack(msg);
-	} else {
-		return false;
-	}
-}
+// bool ShmMsgQueue::Recv(MsgI &imsg, BHMsgHead &head, const int timeout_ms)
+// {
+// 	if (Read(imsg, timeout_ms)) {
+// 		// DEFER1(imsg.Release(shm()););
+// 		return imsg.ParseHead(head);
+// 	} else {
+// 		return false;
+// 	}
+// }
 
 } // namespace bhome_shm
diff --git a/src/shm_queue.h b/src/shm_queue.h
index ab8a88c..32ccfae 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -131,7 +131,7 @@
 	~ShmMsgQueue();
 	const MQId &Id() const { return id_; }
 
-	bool Recv(BHMsg &msg, const int timeout_ms);
+	// bool Recv(MsgI &msg, BHMsgHead &head, const int timeout_ms);
 	bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
 	static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend);
 	static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms);
@@ -141,12 +141,11 @@
 	{
 		return Send(shm(), remote_id, msg, timeout_ms, extra...);
 	}
-
-	template <class... Extra>
-	bool Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms, Extra const &...extra)
+	template <class Body, class... Extra>
+	bool Send(const MQId &remote_id, const BHMsgHead &head, const Body &body, const int timeout_ms, Extra const &...extra)
 	{
 		MsgI msg;
-		if (msg.Make(shm(), data)) {
+		if (msg.Make(shm(), head, body)) {
 			if (Send(shm(), remote_id, msg, timeout_ms, extra...)) {
 				return true;
 			} else {
@@ -155,6 +154,7 @@
 		}
 		return false;
 	}
+
 	size_t Pending() const { return data()->size(); }
 };
 
diff --git a/src/socket.cpp b/src/socket.cpp
index b9def0c..f2b29f4 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -29,43 +29,53 @@
 
 } // namespace
 
-ShmSocket::ShmSocket(Shm &shm, const void *id, const int len) :
-    shm_(shm), run_(false)
+ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) :
+    shm_(shm), run_(false), mq_(id, shm, len)
 {
-	if (id && len > 0) {
-		mq_.reset(new Queue(*static_cast<const MQId *>(id), shm, len));
-	}
 }
 ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
-    shm_(shm), run_(false)
-{
-	if (len > 0) {
-		mq_.reset(new Queue(shm_, len));
-	}
-}
+    shm_(shm), run_(false), mq_(shm, len) {}
 
 ShmSocket::~ShmSocket()
 {
 	Stop(); //TODO should stop in sub class, incase thread access sub class data.
 }
 
-bool ShmSocket::Start(const RecvCB &onData, const IdleCB &onIdle, int nworker)
+bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
 {
-	if (!mq_ || !onData) {
-		return false; // TODO error code.
-	}
+	auto onRecv = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
+		auto Find = [&](RecvCB &cb) {
+			std::lock_guard<std::mutex> lock(mutex());
+			const std::string &msgid = head.msg_id();
+			auto pos = async_cbs_.find(msgid);
+			if (pos != async_cbs_.end()) {
+				cb.swap(pos->second);
+				async_cbs_.erase(pos);
+				return true;
+			} else {
+				return false;
+			}
+		};
+
+		RecvCB cb;
+		if (Find(cb)) {
+			cb(socket, imsg, head);
+		} else if (onData) {
+			onData(socket, imsg, head);
+		} // else ignored, or dropped
+	};
 
 	std::lock_guard<std::mutex> lock(mutex_);
 	StopNoLock();
-	auto RecvProc = [this, onData, onIdle]() {
+	auto RecvProc = [this, onRecv, onIdle]() {
 		while (run_) {
 			try {
 				MsgI imsg;
-				DEFER1(imsg.Release(shm_));
-				if (mq_->Recv(imsg, 100)) {
-					BHMsg msg;
-					if (imsg.Unpack(msg)) {
-						onData(*this, imsg, msg);
+				if (mq().Recv(imsg, 10)) {
+					DEFER1(imsg.Release(shm()));
+					BHMsgHead head;
+					if (imsg.ParseHead(head)) {
+						onRecv(*this, imsg, head);
 					}
 				} else if (onIdle) {
 					onIdle(*this);
@@ -102,17 +112,18 @@
 	return false;
 }
 
-bool ShmSocket::SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms)
-{
-	return mq_->Send(*static_cast<const MQId *>(id), msg, timeout_ms);
-}
-
-bool ShmSocket::SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms)
+bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms)
 {
 	std::lock_guard<std::mutex> lock(mutex_);
-	if (!mq_ || RunningNoLock()) {
+	auto Recv = [&]() {
+		if (mq().Recv(msg, timeout_ms)) {
+			if (msg.ParseHead(head)) {
+				return true;
+			} else {
+				msg.Release(shm());
+			}
+		}
 		return false;
-	} else {
-		return mq_->Recv(msg, timeout_ms);
-	}
+	};
+	return !RunningNoLock() && Recv();
 }
diff --git a/src/socket.h b/src/socket.h
index 57d0ae4..7c4f83f 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -19,14 +19,18 @@
 #ifndef SOCKET_GWTJHBPO
 #define SOCKET_GWTJHBPO
 
+#include "defs.h"
 #include "shm_queue.h"
 #include <atomic>
 #include <boost/noncopyable.hpp>
+#include <condition_variable>
 #include <functional>
 #include <memory>
 #include <mutex>
 #include <thread>
 #include <vector>
+
+using namespace bhome_msg;
 
 class ShmSocket : private boost::noncopyable
 {
@@ -35,36 +39,88 @@
 
 public:
 	typedef bhome_shm::SharedMemory Shm;
-	typedef std::function<void(ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg)> RecvCB;
-	typedef std::function<void(bhome_msg::BHMsg &msg)> RecvBHMsgCB;
+	typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB;
+	typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB;
 	typedef std::function<void(ShmSocket &sock)> IdleCB;
 
-	ShmSocket(Shm &shm, const void *id, const int len);
+	ShmSocket(Shm &shm, const MQId &id, const int len);
 	ShmSocket(Shm &shm, const int len = 12);
 	~ShmSocket();
-
+	const MQId &id() const { return mq().Id(); }
 	Shm &shm() { return shm_; }
 	// start recv.
-	bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1);
-	bool Start(const RecvCB &onData, int nworker = 1) { return Start(onData, IdleCB(), nworker); }
-	bool Start(const RecvBHMsgCB &onData, const IdleCB &onIdle, int nworker = 1)
-	{
-		return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, onIdle, nworker);
-	}
-	bool Start(const RecvBHMsgCB &onData, int nworker = 1)
-	{
-		return Start(onData, IdleCB(), nworker);
-	}
+	bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB());
+	bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); }
+	bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); }
 	bool Stop();
-	size_t Pending() const { return mq_ ? mq_->Pending() : 0; }
+	size_t Pending() const { return mq().Pending(); }
 
-	bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms);
-	bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms);
+	bool Send(const void *id, const MsgI &imsg, const int timeout_ms)
+	{
+		return mq().Send(*static_cast<const MQId *>(id), imsg, timeout_ms);
+	}
+	//TODO reimplment, using async.
+	bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms);
+
+	template <class Body>
+	bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb = RecvCB())
+	{
+		assert(valid_remote);
+		try {
+			if (cb) {
+				auto RegisterCB = [&]() {
+					std::lock_guard<std::mutex> lock(mutex());
+					async_cbs_.emplace(head.msg_id(), cb);
+				};
+				return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms, RegisterCB);
+			} else {
+				return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms);
+			}
+		} catch (...) {
+			return false;
+		}
+	}
+
+	template <class Body>
+	bool SendAndRecv(const void *remote, const BHMsgHead &head, const Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
+	{
+		struct State {
+			std::mutex mutex;
+			std::condition_variable cv;
+			bool canceled = false;
+		};
+
+		try {
+			std::shared_ptr<State> st(new State);
+			auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
+
+			auto OnRecv = [st, &reply, &reply_head](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
+				std::unique_lock<std::mutex> lk(st->mutex);
+				if (!st->canceled) {
+					reply.swap(msg);
+					reply_head.Swap(&head);
+					st->cv.notify_one();
+				} else {
+				}
+			};
+
+			std::unique_lock<std::mutex> lk(st->mutex);
+			bool sendok = Send(remote, head, body, timeout_ms, OnRecv);
+			if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
+				return true;
+			} else {
+				st->canceled = true;
+				return false;
+			}
+		} catch (...) {
+			return false;
+		}
+	}
 
 protected:
 	const Shm &shm() const { return shm_; }
-	Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid.
-	const Queue &mq() const { return *mq_; }
+	Queue &mq() { return mq_; } // programmer should make sure that mq_ is valid.
+	const Queue &mq() const { return mq_; }
 	std::mutex &mutex() { return mutex_; }
 
 private:
@@ -76,7 +132,8 @@
 	std::mutex mutex_;
 	std::atomic<bool> run_;
 
-	std::unique_ptr<Queue> mq_;
+	Queue mq_;
+	std::unordered_map<std::string, RecvCB> async_cbs_;
 };
 
 #endif // end of include guard: SOCKET_GWTJHBPO
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
new file mode 100644
index 0000000..c6c9771
--- /dev/null
+++ b/src/topic_node.cpp
@@ -0,0 +1,322 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  topic_node.cpp
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�07鏃� 09鏃�01鍒�48绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), 
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#include "topic_node.h"
+#include "bh_util.h"
+#include <chrono>
+#include <list>
+
+using namespace std::chrono;
+using namespace std::chrono_literals;
+
+namespace
+{
+inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
+
+struct SrcInfo {
+	std::vector<BHAddress> route;
+	std::string msg_id;
+};
+
+class ServerFailedQ
+{
+	struct FailedMsg {
+		steady_clock::time_point xpr;
+		std::string remote_;
+		BHMsgHead head_;
+		MsgRequestTopicReply body_;
+		FailedMsg(const std::string &addr, BHMsgHead &&head, MsgRequestTopicReply &&body) :
+		    xpr(steady_clock::now() + 10s), remote_(addr), head_(std::move(head)), body_(std::move(body)) {}
+		bool Expired() { return steady_clock::now() > xpr; }
+	};
+	typedef std::list<FailedMsg> Queue;
+	Synced<Queue> queue_;
+
+public:
+	void Push(const std::string &remote, BHMsgHead &&head, MsgRequestTopicReply &&body)
+	{
+		queue_->emplace_back(remote, std::move(head), std::move(body));
+	}
+	void TrySend(ShmSocket &socket, const int timeout_ms = 0)
+	{
+		queue_.Apply([&](Queue &q) {
+			if (!q.empty()) {
+				auto it = q.begin();
+				do {
+					if (it->Expired()) {
+						// it->msg_.Release(socket.shm());
+						it = q.erase(it);
+					} else if (socket.Send(it->remote_.data(), it->head_, it->body_, timeout_ms)) {
+						it = q.erase(it);
+					} else {
+						++it;
+					}
+				} while (it != q.end());
+			}
+		});
+	}
+};
+
+} // namespace
+TopicNode::TopicNode(SharedMemory &shm) :
+    shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm)
+{
+	SockNode().Start();
+}
+TopicNode::~TopicNode()
+{
+	StopAll();
+	SockNode().Stop();
+}
+void TopicNode::StopAll()
+{
+	ServerStop();
+	ClientStopWorker();
+}
+
+bool TopicNode::Register(const MsgRegister &body, MsgCommonReply &reply_body, const int timeout_ms)
+{
+	auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
+	AddRoute(head, SockNode().id());
+
+	MsgI reply;
+	DEFER1(reply.Release(shm_););
+	BHMsgHead reply_head;
+	bool r = SockNode().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+	r = r && reply_head.type() == kMsgTypeCommonReply;
+	r = r && reply.ParseBody(reply_body);
+	if (r) {
+		info_ = body;
+	}
+	return r;
+}
+
+bool TopicNode::RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply_body, const int timeout_ms)
+{
+	//TODO check registered
+
+	auto head(InitMsgHead(GetType(body), proc_id()));
+	AddRoute(head, SockReply().id());
+
+	MsgI reply;
+	DEFER1(reply.Release(shm_););
+	BHMsgHead reply_head;
+	bool r = SockReply().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+	r = r && reply_head.type() == kMsgTypeCommonReply;
+	r = r && reply.ParseBody(reply_body);
+	return r;
+}
+
+bool TopicNode::ServerStart(const OnRequest &rcb, int nworker)
+{
+	//TODO check registered
+
+	auto failed_q = std::make_shared<ServerFailedQ>();
+
+	auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); };
+
+	auto onRecv = [this, rcb, failed_q, onIdle](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
+		if (head.type() == kMsgTypeRequestTopic && head.route_size() > 0) {
+			MsgRequestTopic req;
+			if (imsg.ParseBody(req)) {
+				std::string out;
+				if (rcb(req.topic(), req.data(), out)) {
+					MsgRequestTopicReply reply_body;
+					reply_body.set_data(std::move(out));
+					BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id()));
+
+					for (int i = 0; i < head.route_size() - 1; ++i) {
+						reply_head.add_route()->Swap(head.mutable_route(i));
+					}
+					if (!sock.Send(head.route().rbegin()->mq_id().data(), reply_head, reply_body, 10)) {
+						failed_q->Push(head.route().rbegin()->mq_id(), std::move(reply_head), std::move(reply_body));
+					}
+				}
+			}
+		} else {
+			// ignored, or dropped
+		}
+
+		onIdle(sock);
+	};
+
+	return rcb && SockReply().Start(onRecv, onIdle, nworker);
+}
+bool TopicNode::ServerStop() { return SockReply().Stop(); }
+
+bool TopicNode::ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
+{
+	MsgI imsg;
+	BHMsgHead head;
+	if (SockReply().SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) {
+		MsgRequestTopic request;
+		if (imsg.ParseBody(request)) {
+			request.mutable_topic()->swap(topic);
+			request.mutable_data()->swap(data);
+			SrcInfo *p = new SrcInfo;
+			p->route.assign(head.route().begin(), head.route().end());
+			p->msg_id = head.msg_id();
+			src_info = p;
+			return true;
+		}
+	}
+	return false;
+}
+
+bool TopicNode::ServerSendReply(void *src_info, const std::string &data, const int timeout_ms)
+{
+	SrcInfo *p = static_cast<SrcInfo *>(src_info);
+	DEFER1(delete p);
+	if (!p || p->route.empty()) {
+		return false;
+	}
+	MsgRequestTopicReply body;
+	body.set_data(data);
+	BHMsgHead head(InitMsgHead(GetType(body), proc_id(), p->msg_id));
+
+	for (unsigned i = 0; i < p->route.size() - 1; ++i) {
+		head.add_route()->Swap(&p->route[i]);
+	}
+
+	return SockReply().Send(p->route.back().mq_id().data(), head, body, timeout_ms);
+}
+
+bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker)
+{
+	if (!cb) {
+		return false;
+	}
+	auto onData = [this, cb](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
+		if (head.type() == kMsgTypeRequestTopicReply) {
+			MsgRequestTopicReply reply;
+			if (imsg.ParseBody(reply)) {
+				cb(reply.data());
+			}
+		}
+	};
+
+	return SockRequest().Start(onData, nworker);
+}
+bool TopicNode::ClientStopWorker() { return SockRequest().Stop(); }
+
+bool TopicNode::ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
+{
+	auto Call = [&](const void *remote) {
+		auto &sock = SockRequest();
+		MsgRequestTopic req;
+		req.set_topic(topic);
+		req.set_data(data, size);
+		BHMsgHead head(InitMsgHead(GetType(req), proc_id()));
+		AddRoute(head, sock.id());
+
+		if (cb) {
+			auto onRecv = [cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
+				if (head.type() == kMsgTypeRequestTopicReply) {
+					MsgRequestTopicReply reply;
+					if (imsg.ParseBody(reply)) {
+						cb(reply.data());
+					}
+				}
+			};
+			return sock.Send(remote, head, req, timeout_ms, onRecv);
+		} else {
+			return sock.Send(remote, head, req, timeout_ms);
+		}
+	};
+
+	try {
+		BHAddress addr;
+		if (ClientQueryRPCTopic(topic, addr, timeout_ms)) {
+			return Call(addr.mq_id().data());
+		} else {
+			return false;
+		}
+	} catch (...) {
+		return false;
+	}
+}
+
+bool TopicNode::ClientSyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms)
+{
+	try {
+		auto &sock = SockRequest();
+		BHAddress addr;
+		if (ClientQueryRPCTopic(topic, addr, timeout_ms)) {
+
+			MsgRequestTopic req;
+			req.set_topic(topic);
+			req.set_data(data, size);
+			BHMsgHead head(InitMsgHead(GetType(req), proc_id()));
+			AddRoute(head, sock.id());
+
+			MsgI reply;
+			DEFER1(reply.Release(shm_););
+			BHMsgHead reply_head;
+
+			if (sock.SendAndRecv(addr.mq_id().data(), head, req, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeRequestTopicReply) {
+				MsgRequestTopicReply dr;
+				if (reply.ParseBody(dr)) {
+					dr.mutable_data()->swap(out);
+					return true;
+				} else {
+					printf("error parse reply.\n");
+				}
+			} else {
+				printf("error recv data. line: %d\n", __LINE__);
+			}
+		} else {
+			printf("error recv data. line: %d\n", __LINE__);
+		}
+	} catch (...) {
+		printf("error recv data. line: %d\n", __LINE__);
+	}
+	return false;
+}
+
+bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
+{
+	auto &sock = SockRequest();
+	if (topic_query_cache_.Find(topic, addr)) {
+		return true;
+	}
+
+	MsgQueryTopic query;
+	query.set_topic(topic);
+	BHMsgHead head(InitMsgHead(GetType(query), proc_id()));
+	AddRoute(head, sock.id());
+
+	MsgI reply;
+	DEFER1(reply.Release(shm_));
+	BHMsgHead reply_head;
+
+	if (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms)) {
+		if (reply_head.type() == kMsgTypeQueryTopicReply) {
+			MsgQueryTopicReply rep;
+			if (reply.ParseBody(rep)) {
+				addr = rep.address();
+				if (addr.mq_id().empty()) {
+					return false;
+				} else {
+					topic_query_cache_.Update(topic, addr);
+					return true;
+				}
+			}
+		}
+	} else {
+	}
+	return false;
+}
\ No newline at end of file
diff --git a/src/topic_node.h b/src/topic_node.h
new file mode 100644
index 0000000..8852af1
--- /dev/null
+++ b/src/topic_node.h
@@ -0,0 +1,121 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  topic_node.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�07鏃� 09鏃�05鍒�26绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), lichao@aiotlink.com
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#ifndef TOPIC_NODE_YVKWA6TF
+#define TOPIC_NODE_YVKWA6TF
+
+#include "msg.h"
+#include "pubsub.h"
+#include "socket.h"
+#include <memory>
+
+using namespace bhome_shm;
+using namespace bhome_msg;
+
+// a node is a client.
+class TopicNode
+{
+	SharedMemory &shm_;
+	MsgRegister info_;
+
+public:
+	TopicNode(SharedMemory &shm);
+	~TopicNode();
+	bool Register(const MsgRegister &body, MsgCommonReply &reply, const int timeout_ms);
+	bool RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply, const int timeout_ms);
+
+	// topic rpc server
+	typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
+	bool ServerStart(OnRequest const &cb, const int nworker = 2);
+	bool ServerStop();
+	bool ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
+	bool ServerSendReply(void *src_info, const std::string &data, const int timeout_ms);
+
+	// topic client
+	typedef std::function<void(const std::string &data)> RequestResultCB;
+	bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2);
+	bool ClientStopWorker();
+	bool ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB());
+	bool ClientAsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB())
+	{
+		return ClientAsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
+	}
+	bool ClientSyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
+	bool ClientSyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms)
+	{
+		return ClientSyncRequest(topic, data.data(), data.size(), out, timeout_ms);
+	}
+
+	void StopAll();
+
+private:
+	bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
+	const std::string &proc_id() { return info_.proc().proc_id(); }
+
+	typedef bhome_msg::BHAddress Address;
+	class TopicQueryCache
+	{
+		class Impl
+		{
+			typedef std::unordered_map<Topic, Address> Store;
+			Store store_;
+
+		public:
+			bool Find(const Topic &topic, Address &addr)
+			{
+				auto pos = store_.find(topic);
+				if (pos != store_.end()) {
+					addr = pos->second;
+					return true;
+				} else {
+					return false;
+				}
+			}
+			bool Update(const Topic &topic, const Address &addr)
+			{
+				store_[topic] = addr;
+				return true;
+			}
+		};
+		Synced<Impl> impl_;
+		// Impl &impl()
+		// {
+		// 	thread_local Impl impl;
+		// 	return impl;
+		// }
+
+	public:
+		bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); }
+		bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); }
+	};
+
+	// some sockets may be the same one, using functions make it easy to change.
+
+	auto &SockNode() { return sock_node_; }
+	auto &SockSub() { return sock_sub_; }
+	auto &SockRequest() { return sock_request_; }
+	auto &SockReply() { return sock_reply_; }
+
+	ShmSocket sock_node_;
+	ShmSocket sock_request_;
+	ShmSocket sock_reply_;
+	SocketSubscribe sock_sub_;
+
+	TopicQueryCache topic_query_cache_;
+};
+
+#endif // end of include guard: TOPIC_NODE_YVKWA6TF
diff --git a/src/topic_reply.cpp b/src/topic_reply.cpp
deleted file mode 100644
index 2ab75e6..0000000
--- a/src/topic_reply.cpp
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * =====================================================================================
- *
- *       Filename:  topic_reply.cpp
- *
- *    Description:  
- *
- *        Version:  1.0
- *        Created:  2021骞�04鏈�06鏃� 14鏃�40鍒�52绉�
- *       Revision:  none
- *       Compiler:  gcc
- *
- *         Author:  Li Chao (), 
- *   Organization:  
- *
- * =====================================================================================
- */
-#include "topic_reply.h"
-#include <chrono>
-#include <list>
-
-using namespace bhome_msg;
-using namespace std::chrono;
-using namespace std::chrono_literals;
-
-namespace
-{
-struct SrcInfo {
-	std::vector<BHAddress> route;
-	std::string msg_id;
-};
-
-class FailedQ
-{
-	struct FailedMsg {
-		steady_clock::time_point xpr;
-		std::string remote_;
-		BHMsg msg_;
-		FailedMsg(const std::string &addr, BHMsg &&msg) :
-		    xpr(steady_clock::now() + 10s), remote_(addr), msg_(std::move(msg)) {}
-		bool Expired() { return steady_clock::now() > xpr; }
-	};
-	typedef std::list<FailedMsg> Queue;
-	Synced<Queue> queue_;
-
-public:
-	void Push(const std::string &remote, BHMsg &&msg)
-	{
-		queue_->emplace_back(remote, std::move(msg));
-	}
-	void TrySend(ShmSocket &socket, const int timeout_ms = 0)
-	{
-		queue_.Apply([&](Queue &q) {
-			if (!q.empty()) {
-				auto it = q.begin();
-				do {
-					if (it->Expired() || socket.SyncSend(it->remote_.data(), it->msg_, timeout_ms)) {
-						it = q.erase(it);
-					} else {
-						++it;
-					}
-				} while (it != q.end());
-			}
-		});
-	}
-};
-
-} // namespace
-
-bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
-{
-	//TODO check reply?
-	return SyncSend(&BHTopicCenterAddress(), MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
-}
-bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
-{
-	return SyncSend(&BHTopicCenterAddress(), MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
-}
-bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
-{
-	auto failed_q = std::make_shared<FailedQ>();
-
-	auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); };
-
-	auto onRecv = [this, rcb, failed_q, onIdle](BHMsg &msg) {
-		if (msg.type() == kMsgTypeRequestTopic && msg.route_size() > 0) {
-			MsgRequestTopic req;
-			if (req.ParseFromString(msg.body())) {
-				std::string out;
-				if (rcb(req.topic(), req.data(), out)) {
-					BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size()));
-					for (int i = 0; i < msg.route_size() - 1; ++i) {
-						msg.add_route()->Swap(msg.mutable_route(i));
-					}
-					if (!SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 10)) {
-						failed_q->Push(msg.route().rbegin()->mq_id(), std::move(msg_reply));
-					}
-				}
-			}
-		} else {
-			// ignored, or dropped
-		}
-
-		onIdle(*this);
-	};
-
-	return rcb && Start(onRecv, onIdle, nworker);
-}
-
-bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
-{
-	BHMsg msg;
-	if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequestTopic) {
-		MsgRequestTopic request;
-		if (request.ParseFromString(msg.body())) {
-			request.mutable_topic()->swap(topic);
-			request.mutable_data()->swap(data);
-			SrcInfo *p = new SrcInfo;
-			p->route.assign(msg.route().begin(), msg.route().end());
-			p->msg_id = msg.msg_id();
-			src_info = p;
-			return true;
-		}
-	}
-	return false;
-}
-
-bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms)
-{
-	SrcInfo *p = static_cast<SrcInfo *>(src_info);
-	DEFER1(delete p);
-	if (!p || p->route.empty()) {
-		return false;
-	}
-
-	BHMsg msg(MakeReply(p->msg_id, data.data(), data.size()));
-	for (unsigned i = 0; i < p->route.size() - 1; ++i) {
-		msg.add_route()->Swap(&p->route[i]);
-	}
-
-	return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms);
-}
\ No newline at end of file
diff --git a/src/topic_reply.h b/src/topic_reply.h
deleted file mode 100644
index 090ad88..0000000
--- a/src/topic_reply.h
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * =====================================================================================
- *
- *       Filename:  topic_reply.h
- *
- *    Description:  
- *
- *        Version:  1.0
- *        Created:  2021骞�04鏈�06鏃� 14鏃�41鍒�12绉�
- *       Revision:  none
- *       Compiler:  gcc
- *
- *         Author:  Li Chao (), 
- *   Organization:  
- *
- * =====================================================================================
- */
-#ifndef TOPIC_REPLY_3RVYPPWI
-#define TOPIC_REPLY_3RVYPPWI
-
-#include "bh_util.h"
-#include "defs.h"
-#include "msg.h"
-#include "socket.h"
-#include <deque>
-#include <functional>
-
-using bhome::msg::ProcInfo;
-
-class SocketReply : private ShmSocket
-{
-	typedef ShmSocket Socket;
-
-public:
-	SocketReply(Socket::Shm &shm) :
-	    Socket(shm, 64) {}
-	SocketReply() :
-	    SocketReply(BHomeShm()) {}
-	~SocketReply() { Stop(); }
-
-	typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
-	bool StartWorker(const OnRequest &rcb, int nworker = 2);
-	bool Stop() { return Socket::Stop(); }
-	bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
-	bool SendReply(void *src_info, const std::string &data, const int timeout_ms);
-	bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms);
-	bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms);
-
-private:
-};
-
-#endif // end of include guard: TOPIC_REPLY_3RVYPPWI
diff --git a/src/topic_request.cpp b/src/topic_request.cpp
deleted file mode 100644
index 382ce21..0000000
--- a/src/topic_request.cpp
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * =====================================================================================
- *
- *       Filename:  topic_request.cpp
- *
- *    Description:  topic request sockets
- *
- *        Version:  1.0
- *        Created:  2021骞�04鏈�01鏃� 09鏃�35鍒�35绉�
- *       Revision:  none
- *       Compiler:  gcc
- *
- *         Author:  Li Chao (), 
- *   Organization:  
- *
- * =====================================================================================
- */
-#include "topic_request.h"
-#include "bh_util.h"
-#include "msg.h"
-#include <chrono>
-#include <condition_variable>
-
-using namespace bhome_msg;
-
-bool SocketRequest::StartWorker(const RequestResultCB &rrcb, int nworker)
-{
-	auto AsyncRecvProc = [this, rrcb](BHMsg &msg) {
-		auto Find = [&](RecvBHMsgCB &cb) {
-			std::lock_guard<std::mutex> lock(mutex());
-			const std::string &msgid = msg.msg_id();
-			auto pos = async_cbs_.find(msgid);
-			if (pos != async_cbs_.end()) {
-				cb.swap(pos->second);
-				async_cbs_.erase(pos);
-				return true;
-			} else {
-				return false;
-			}
-		};
-
-		RecvBHMsgCB cb;
-		if (Find(cb) && cb) {
-			cb(msg);
-		} else if (msg.type() == kMsgTypeRequestTopicReply && rrcb) {
-			MsgRequestTopicReply reply;
-			if (reply.ParseFromString(msg.body())) {
-				rrcb(reply.data());
-			}
-		} else {
-			// ignored, or dropped
-		}
-	};
-
-	return Start(AsyncRecvProc, nworker);
-}
-
-bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms)
-{
-	try {
-		BHAddress addr;
-		if (QueryRPCTopic(topic, addr, timeout_ms)) {
-			const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
-			return AsyncSend(addr.mq_id().data(), &msg, timeout_ms);
-		} else {
-			return false;
-		}
-	} catch (...) {
-		return false;
-	}
-}
-bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
-{
-	auto Call = [&](const void *remote) {
-		const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
-		auto onRecv = [cb](BHMsg &msg) {
-			if (msg.type() == kMsgTypeRequestTopicReply) {
-				MsgRequestTopicReply reply;
-				if (reply.ParseFromString(msg.body())) {
-					cb(reply.data());
-				}
-			}
-		};
-		return AsyncSend(remote, &msg, timeout_ms, onRecv);
-	};
-
-	try {
-		BHAddress addr;
-		if (QueryRPCTopic(topic, addr, timeout_ms)) {
-			return Call(addr.mq_id().data());
-		} else {
-			return false;
-		}
-	} catch (...) {
-		return false;
-	}
-}
-
-bool SocketRequest::SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms)
-{
-	try {
-		BHAddress addr;
-		if (QueryRPCTopic(topic, addr, timeout_ms)) {
-			const BHMsg &req(MakeRequest(mq().Id(), topic, data, size));
-			BHMsg reply;
-			if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeRequestTopicReply) {
-				MsgRequestTopicReply dr;
-				if (dr.ParseFromString(reply.body())) {
-					dr.mutable_data()->swap(out);
-					return true;
-				} else {
-					printf("error parse reply.\n");
-				}
-			} else {
-				printf("error recv data. line: %d\n", __LINE__);
-			}
-		} else {
-			printf("error recv data. line: %d\n", __LINE__);
-		}
-	} catch (...) {
-		printf("error recv data. line: %d\n", __LINE__);
-	}
-	return false;
-}
-
-bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms)
-{
-	assert(remote && pmsg);
-	try {
-		const BHMsg &msg = *static_cast<const BHMsg *>(pmsg);
-		return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms);
-	} catch (...) {
-		return false;
-	}
-}
-bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvBHMsgCB &cb)
-{
-	assert(remote && pmsg);
-	try {
-		const BHMsg &msg = *static_cast<const BHMsg *>(pmsg);
-		auto RegisterCB = [&]() {
-			std::lock_guard<std::mutex> lock(mutex());
-			async_cbs_.emplace(msg.msg_id(), cb);
-		};
-
-		return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms, RegisterCB);
-	} catch (...) {
-		return false;
-	}
-}
-
-bool SocketRequest::SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms)
-{
-	struct State {
-		std::mutex mutex;
-		std::condition_variable cv;
-		bool canceled = false;
-	};
-
-	try {
-		std::shared_ptr<State> st(new State);
-		auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
-
-		auto OnRecv = [=](BHMsg &msg) {
-			std::unique_lock<std::mutex> lk(st->mutex);
-			if (!st->canceled) {
-				static_cast<BHMsg *>(result)->Swap(&msg);
-				st->cv.notify_one();
-			} else {
-			}
-		};
-
-		std::unique_lock<std::mutex> lk(st->mutex);
-		bool sendok = AsyncSend(remote, msg, timeout_ms, OnRecv);
-		if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
-			return true;
-		} else {
-			st->canceled = true;
-			return false;
-		}
-	} catch (...) {
-		return false;
-	}
-}
-
-bool SocketRequest::QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
-{
-	if (topic_cache_.Find(topic, addr)) {
-		return true;
-	}
-
-	BHMsg result;
-	const BHMsg &msg = MakeQueryTopic(mq().Id(), topic);
-	if (SyncSendAndRecv(&BHTopicCenterAddress(), &msg, &result, timeout_ms)) {
-		if (result.type() == kMsgTypeQueryTopicReply) {
-			MsgQueryTopicReply reply;
-			if (reply.ParseFromString(result.body())) {
-				addr = reply.address();
-				if (addr.mq_id().empty()) {
-					return false;
-				} else {
-					topic_cache_.Update(topic, addr);
-					return true;
-				}
-			}
-		}
-	} else {
-	}
-	return false;
-}
diff --git a/src/topic_request.h b/src/topic_request.h
deleted file mode 100644
index 6765dc2..0000000
--- a/src/topic_request.h
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * =====================================================================================
- *
- *       Filename:  topic_request.h
- *
- *    Description:  topic request socket
- *
- *        Version:  1.0
- *        Created:  2021骞�04鏈�01鏃� 09鏃�36鍒�06绉�
- *       Revision:  none
- *       Compiler:  gcc
- *
- *         Author:  Li Chao (), 
- *   Organization:  
- *
- * =====================================================================================
- */
-#ifndef TOPIC_REQUEST_ACEH09NK
-#define TOPIC_REQUEST_ACEH09NK
-
-#include "bh_util.h"
-#include "defs.h"
-#include "msg.h"
-#include "socket.h"
-#include <functional>
-#include <unordered_map>
-
-using bhome::msg::ProcInfo;
-
-class SocketRequest : private ShmSocket
-{
-	typedef ShmSocket Socket;
-
-public:
-	SocketRequest(Socket::Shm &shm) :
-	    Socket(shm, 64) { StartWorker(); }
-	SocketRequest() :
-	    SocketRequest(BHomeShm()) {}
-	~SocketRequest() { Stop(); }
-
-	typedef std::function<void(const std::string &data)> RequestResultCB;
-	bool StartWorker(const RequestResultCB &rrcb, int nworker = 2);
-	bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); }
-	bool Stop() { return Socket::Stop(); }
-	bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
-	bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms);
-
-	bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
-	{
-		return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
-	}
-	bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms)
-	{
-		return AsyncRequest(topic, data.data(), data.size(), timeout_ms);
-	}
-	bool SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
-	bool SyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms)
-	{
-		return SyncRequest(topic, data.data(), data.size(), out, timeout_ms);
-	}
-
-private:
-	bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvBHMsgCB &cb);
-	bool AsyncSend(const void *remote, const void *msg, const int timeout_ms);
-	bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms);
-	bool QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
-	std::unordered_map<std::string, RecvBHMsgCB> async_cbs_;
-
-	typedef bhome_msg::BHAddress Address;
-	class TopicCache
-	{
-		class Impl
-		{
-			typedef std::unordered_map<Topic, Address> Store;
-			Store store_;
-
-		public:
-			bool Find(const Topic &topic, Address &addr)
-			{
-				auto pos = store_.find(topic);
-				if (pos != store_.end()) {
-					addr = pos->second;
-					return true;
-				} else {
-					return false;
-				}
-			}
-			bool Update(const Topic &topic, const Address &addr)
-			{
-				store_[topic] = addr;
-				return true;
-			}
-		};
-		Synced<Impl> impl_;
-		// Impl &impl()
-		// {
-		// 	thread_local Impl impl;
-		// 	return impl;
-		// }
-
-	public:
-		bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); }
-		bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); }
-	};
-	TopicCache topic_cache_;
-};
-
-#endif // end of include guard: TOPIC_REQUEST_ACEH09NK
diff --git a/utest/simple_tests.cpp b/utest/simple_tests.cpp
index 06093fd..cbbcc2a 100644
--- a/utest/simple_tests.cpp
+++ b/utest/simple_tests.cpp
@@ -36,7 +36,7 @@
 	BOOST_CHECK(!p);
 	BOOST_CHECK(p.get() == 0);
 	const char *str = "basic";
-	p               = str;
+	p = str;
 	BOOST_CHECK(p);
 	BOOST_CHECK(p.get() == str);
 	p = 0;
@@ -49,7 +49,7 @@
 		auto Code = [&](int id) {
 			typedef ShmObject<s1000> Int;
 			std::string name = std::to_string(id);
-			auto a0          = Avail();
+			auto a0 = Avail();
 			Int i1(shm, name);
 			auto a1 = Avail();
 			BOOST_CHECK_LT(a1, a0);
@@ -64,7 +64,7 @@
 
 			{
 				auto old = Avail();
-				void *p  = shm.Alloc(1024);
+				void *p = shm.Alloc(1024);
 				shm.Dealloc(p);
 				BOOST_CHECK_EQUAL(old, Avail());
 			}
@@ -80,7 +80,7 @@
 	// boost::timer::auto_cpu_timer timer;
 	ThreadManager threads;
 	int nthread = 1;
-	int nloop   = 1;
+	int nloop = 1;
 	for (int i = 0; i < nthread; ++i) {
 		threads.Launch(BasicTest, i, nloop);
 	}
@@ -114,7 +114,7 @@
 		int ms = i * 100;
 		printf("Timeout Test %4d: ", ms);
 		boost::timer::auto_cpu_timer timer;
-		BHMsg msg;
+		MsgI msg;
 		bool r = q.Recv(msg, ms);
 		BOOST_CHECK(!r);
 	}
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index 34f80d8..d777f91 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -28,14 +28,20 @@
 	MQId id = boost::uuids::random_generator()();
 	const int timeout = 100;
 	const uint32_t data_size = 4000;
+	const std::string proc_id = "demo_proc";
 
 	auto Writer = [&](int writer_id, uint64_t n) {
 		SharedMemory shm(shm_name, mem_size);
 		ShmMsgQueue mq(shm, 64);
 		std::string str(data_size, 'a');
 		MsgI msg;
+		MsgRequestTopic body;
+		body.set_topic("topic");
+		body.set_data(str);
+		auto head(InitMsgHead(GetType(body), proc_id));
+		msg.MakeRC(shm, head, body);
 		DEFER1(msg.Release(shm););
-		msg.MakeRC(shm, MakeRequest(mq.Id(), "topic", str.data(), str.size()));
+
 		for (uint64_t i = 0; i < n; ++i) {
 			// mq.Send(id, str.data(), str.size(), timeout);
 			mq.Send(id, msg, timeout);
@@ -45,8 +51,10 @@
 		SharedMemory shm(shm_name, mem_size);
 		ShmMsgQueue mq(id, shm, 1000);
 		while (*run) {
-			BHMsg msg;
+			MsgI msg;
+			BHMsgHead head;
 			if (mq.Recv(msg, timeout)) {
+				DEFER1(msg.Release(shm));
 				// ok
 			} else if (isfork) {
 				exit(0); // for forked quit after 1s.
@@ -113,6 +121,8 @@
 	const size_t msg_length = 1000;
 	std::string msg_content(msg_length, 'a');
 	msg_content[20] = '\0';
+	const std::string client_proc_id = "client_proc";
+	const std::string server_proc_id = "server_proc";
 
 	SharedMemory shm(shm_name, 1024 * 1024 * 50);
 	auto Avail = [&]() { return shm.get_free_memory(); };
@@ -121,9 +131,18 @@
 	ShmMsgQueue cli(shm, qlen);
 
 	MsgI request_rc;
-	request_rc.MakeRC(shm, MakeRequest(cli.Id(), "topic", msg_content.data(), msg_content.size()));
+	MsgRequestTopic req_body;
+	req_body.set_topic("topic");
+	req_body.set_data(msg_content);
+	auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
+	request_rc.MakeRC(shm, req_head, req_body);
+
+	MsgRequestTopic reply_body;
+	reply_body.set_topic("topic");
+	reply_body.set_data(msg_content);
+	auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id));
 	MsgI reply_rc;
-	reply_rc.MakeRC(shm, MakeReply("fakemsgid", msg_content.data(), msg_content.size()));
+	reply_rc.MakeRC(shm, reply_head, reply_body);
 
 	std::atomic<uint64_t> count(0);
 
@@ -133,7 +152,11 @@
 	auto Client = [&](int cli_id, int nmsg) {
 		for (int i = 0; i < nmsg; ++i) {
 			auto Req = [&]() {
-				return cli.Send(srv.Id(), MakeRequest(cli.Id(), "topic", msg_content.data(), msg_content.size()), 100);
+				MsgRequestTopic req_body;
+				req_body.set_topic("topic");
+				req_body.set_data(msg_content);
+				auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
+				return cli.Send(srv.Id(), req_head, req_body, 100);
 			};
 			auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); };
 
@@ -141,10 +164,12 @@
 				printf("********** client send error.\n");
 				continue;
 			}
-			BHMsg msg;
+			MsgI msg;
+			BHMsgHead head;
 			if (!cli.Recv(msg, 1000)) {
 				printf("********** client recv error.\n");
 			} else {
+				DEFER1(msg.Release(shm));
 				++count;
 				auto cur = Now();
 				if (last_time.exchange(cur) < cur) {
@@ -158,18 +183,27 @@
 
 	std::atomic<bool> stop(false);
 	auto Server = [&]() {
-		BHMsg req;
-		while (!stop) {
-			if (srv.Recv(req, 100) && req.type() == kMsgTypeRequestTopic) {
-				auto &mqid = req.route()[0].mq_id();
-				MQId src_id;
-				memcpy(&src_id, mqid.data(), sizeof(src_id));
-				auto Reply = [&]() {
-					return srv.Send(src_id, MakeReply(req.msg_id(), msg_content.data(), msg_content.size()), 100);
-				};
-				auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); };
+		MsgI req;
+		BHMsgHead req_head;
 
-				if (ReplyRC()) {
+		while (!stop) {
+			if (srv.Recv(req, 100)) {
+				DEFER1(req.Release(shm));
+				if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
+					auto &mqid = req_head.route()[0].mq_id();
+					MQId src_id;
+					memcpy(&src_id, mqid.data(), sizeof(src_id));
+					auto Reply = [&]() {
+						MsgRequestTopic reply_body;
+						reply_body.set_topic("topic");
+						reply_body.set_data(msg_content);
+						auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id()));
+						return srv.Send(src_id, reply_head, reply_body, 100);
+					};
+					auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); };
+
+					if (ReplyRC()) {
+					}
 				}
 			}
 		}
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 092455f..c925e22 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -1,11 +1,8 @@
 #include "center.h"
 #include "defs.h"
 #include "pubsub.h"
-#include "pubsub_center.h"
-#include "reqrep_center.h"
 #include "socket.h"
-#include "topic_reply.h"
-#include "topic_request.h"
+#include "topic_node.h"
 #include "util.h"
 #include <atomic>
 #include <boost/uuid/uuid_generators.hpp>
@@ -15,6 +12,7 @@
 #include <string>
 #include <thread>
 #include <vector>
+using namespace bhome_msg;
 
 template <class A, class B>
 struct IsSameType {
@@ -79,9 +77,11 @@
 	int *flag = shm.find_or_construct<int>("flag")(123);
 	printf("flag = %d\n", *flag);
 	++*flag;
+	const std::string sub_proc_id = "subscriber";
+	const std::string pub_proc_id = "publisher";
 
-	PubSubCenter bus(shm);
-	bus.Start();
+	BHCenter center(shm);
+	center.Start();
 
 	std::this_thread::sleep_for(100ms);
 
@@ -93,12 +93,12 @@
 	const int timeout = 1000;
 	auto Sub = [&](int id, const std::vector<std::string> &topics) {
 		SocketSubscribe client(shm);
-		bool r = client.Subscribe(topics, timeout);
+		bool r = client.Subscribe(sub_proc_id, topics, timeout);
 		std::mutex mutex;
 		std::condition_variable cv;
 
 		std::atomic<uint64_t> n(0);
-		auto OnTopicData = [&](const std::string &topic, const std::string &data) {
+		auto OnTopicData = [&](const std::string &proc_id, const std::string &topic, const std::string &data) {
 			++total_count;
 
 			auto cur = Now();
@@ -123,7 +123,7 @@
 		for (unsigned i = 0; i < nmsg; ++i) {
 			std::string data = topic + std::to_string(i) + std::string(1000, '-');
 
-			bool r = provider.Publish(topic, data, timeout);
+			bool r = provider.Publish(pub_proc_id, topic, data.data(), data.size(), timeout);
 			if (!r) {
 				printf("pub ret: %s\n", r ? "ok" : "fail");
 			}
@@ -150,9 +150,8 @@
 	std::cout << "end : " << Now();
 	printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
 	       total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail());
-
-	bus.Stop();
 }
+
 namespace
 {
 struct C {
@@ -177,12 +176,24 @@
 	printf("flag = %d\n", *flag);
 	++*flag;
 
+	const std::string client_proc_id = "client_proc_";
+	const std::string server_proc_id = "server_proc_";
+
 	BHCenter center(shm);
 	center.Start();
 	std::atomic<bool> run(true);
 
 	auto Client = [&](const std::string &topic, const int nreq) {
-		SocketRequest client(shm);
+		TopicNode client(shm);
+		MsgRegister reg;
+		reg.mutable_proc()->set_proc_id(client_proc_id + topic);
+		MsgCommonReply reply_body;
+
+		if (!client.Register(reg, reply_body, 1000)) {
+			printf("client register failed\n");
+			return;
+		}
+
 		std::atomic<int> count(0);
 		std::string reply;
 		auto onRecv = [&](const std::string &rep) {
@@ -191,40 +202,54 @@
 				printf("count: %d\n", count.load());
 			}
 		};
-		client.StartWorker(onRecv, 2);
+		client.ClientStartWorker(onRecv, 2);
 		boost::timer::auto_cpu_timer timer;
 		for (int i = 0; i < nreq; ++i) {
-			if (!client.AsyncRequest(topic, "data " + std::to_string(i), 1000)) {
+			if (!client.ClientAsyncRequest(topic, "data " + std::to_string(i), 1000)) {
 				printf("client request failed\n");
+				++count;
 			}
 
 			// if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) {
 			// 	printf("client request failed\n");
-			// } else {
-			// 	++count;
 			// }
+			// 	++count;
 		}
 		do {
 			std::this_thread::yield();
 		} while (count.load() < nreq);
-		client.Stop();
+		client.ClientStopWorker();
 		printf("request %s %d done ", topic.c_str(), count.load());
 	};
 	std::atomic_uint64_t server_msg_count(0);
 	auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
-		SocketReply server(shm);
-		ProcInfo info;
-		info.set_id(name);
-		info.set_name(name);
-		if (!server.Register(info, topics, 100)) {
-			printf("register failed\n");
+		TopicNode server(shm);
+		MsgRegister reg;
+		reg.mutable_proc()->set_proc_id(server_proc_id);
+		reg.mutable_proc()->set_name(name);
+		MsgCommonReply reply_body;
+
+		if (!server.Register(reg, reply_body, 100)) {
+			printf("server register failed\n");
+			return;
 		}
+
 		auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) {
 			++server_msg_count;
 			reply = topic + ':' + data;
 			return true;
 		};
-		server.StartWorker(onData);
+		server.ServerStart(onData);
+
+		MsgRegisterRPC rpc;
+		for (auto &topic : topics) {
+			rpc.add_topics(topic);
+		}
+		if (!server.RegisterRPC(rpc, reply_body, 100)) {
+			printf("server register topic failed\n");
+			return;
+		}
+
 		while (run) {
 			std::this_thread::yield();
 		}
@@ -234,7 +259,7 @@
 	servers.Launch(Server, "server", topics);
 	std::this_thread::sleep_for(100ms);
 	for (auto &t : topics) {
-		clients.Launch(Client, t, 1000 * 100);
+		clients.Launch(Client, t, 1000 * 1);
 	}
 	clients.WaitAll();
 	printf("clients done, server replyed: %d\n", server_msg_count.load());

--
Gitblit v1.8.0