From d26327b3cde043a9470dcd7fea8e704ea517fdae Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 01 四月 2021 19:26:57 +0800
Subject: [PATCH] add req/rep center;

---
 src/pubsub_center.h          |   10 
 src/reqrep_center.cpp        |  121 ++++++++++++
 src/reqrep.h                 |   35 +++
 src/msg.h                    |    8 
 src/socket.cpp               |    9 
 proto/source/bhome_msg.proto |   28 ++
 src/defs.h                   |    4 
 utest/utest.cpp              |   58 +++++
 src/msg.cpp                  |   68 +++++-
 utest/speed_test.cpp         |   30 +-
 src/pubsub.h                 |    2 
 src/reqrep_center.h          |   61 ++++++
 src/pubsub.cpp               |    4 
 src/pubsub_center.cpp        |    4 
 src/reqrep.cpp               |  112 ++++++++++-
 15 files changed, 482 insertions(+), 72 deletions(-)

diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index a88780b..149d8ee 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -25,8 +25,11 @@
 	kMsgTypePublish = 3;
 	kMsgTypeSubscribe = 4;
 	kMsgTypeUnsubscribe = 5;
-	kMsgTypeQueryTopic = 6;
-	kMsgTypeQueryTopicReply = 7;
+
+	kMsgTypeProcQueryTopic = 6;
+	kMsgTypeProcQueryTopicReply = 7;
+	kMsgTypeProcRegisterTopics = 8;
+	kMsgTypeProcHeartbeat = 9;
 }
 
 message DataPub {
@@ -47,10 +50,27 @@
 	bytes data = 1; 
 }
 
-message DataQueryTopic {
+message ProcInfo
+{
+	bytes name = 1;
+	bytes info = 2;
+}
+
+message DataProcRegister
+{
+	ProcInfo proc = 1;
+	repeated bytes topics = 2;
+}
+
+message DataProcHeartbeat
+{
+	ProcInfo proc = 1;
+}
+
+message DataProcQueryTopic {
 	bytes topic = 1;
 }
 
-message DataQueryTopicReply {
+message DataProcQueryTopicReply {
 	BHAddress address = 1;
 }
diff --git a/src/defs.h b/src/defs.h
index fcdcc70..db73634 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -24,8 +24,8 @@
 
 typedef boost::uuids::uuid MQId;
 
-const MQId kBHBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
-const MQId kBHTopicRPCId = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
+const MQId kBHTopicBus = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
+const MQId kBHTopicReqRepCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
 const int kBHCenterPort = 24287;
 const char kTopicSep = '.';
 namespace bhome_shm
diff --git a/src/msg.cpp b/src/msg.cpp
index 41dd459..c1dfff9 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -24,36 +24,61 @@
 const uint32_t kMsgTag = 0xf1e2d3c4;
 const uint32_t kMsgPrefixLen = 4;
 
-BHMsg InitMsg(MsgType type)
+inline void AddRoute(BHMsg &msg, const MQId &id) { msg.add_route()->set_mq_id(&id, sizeof(id)); }
+
+std::string RandId()
+{
+	boost::uuids::uuid id = boost::uuids::random_generator()();
+	return std::string((char *) &id, sizeof(id));
+}
+BHMsg InitMsg(MsgType type, const std::string &msgid = RandId())
 {
 	BHMsg msg;
+	msg.set_msg_id(msgid);
 	msg.set_type(type);
 	time_t tm = 0;
 	msg.set_timestamp(time(&tm));
 	return msg;
 }
 
-BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size)
-{
-	assert(data && size);
-	BHMsg msg(InitMsg(kMsgTypeRequest));
-	msg.set_body(data, size);
-	msg.add_route()->set_mq_id(&src_id, sizeof(src_id));
-	return msg;
-}
 BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size)
 {
+	BHMsg msg(InitMsg(kMsgTypeRequest));
+	AddRoute(msg, src_id);
 	DataRequest req;
 	req.set_topic(topic);
 	req.set_data(data, size);
-	const std::string &body(req.SerializeAsString());
-	return MakeRequest(src_id, body.data(), body.size());
+	msg.set_body(req.SerializeAsString());
+	return msg;
 }
 
-BHMsg MakeReply(const void *data, const size_t size)
+BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics)
+{
+	BHMsg msg(InitMsg(kMsgTypeProcRegisterTopics));
+	AddRoute(msg, src_id);
+	DataProcRegister reg;
+	reg.mutable_proc()->Swap(&info);
+	for (auto &t : topics) {
+		reg.add_topics(t);
+	}
+	msg.set_body(reg.SerializeAsString());
+	return msg;
+}
+
+BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info)
+{
+	BHMsg msg(InitMsg(kMsgTypeProcHeartbeat));
+	AddRoute(msg, src_id);
+	DataProcRegister reg;
+	reg.mutable_proc()->Swap(&info);
+	msg.set_body(reg.SerializeAsString());
+	return msg;
+}
+
+BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size)
 {
 	assert(data && size);
-	BHMsg msg(InitMsg(kMsgTypeReply));
+	BHMsg msg(InitMsg(kMsgTypeReply, src_msgid));
 	DataReply reply;
 	reply.set_data(data, size);
 	msg.set_body(reply.SerializeAsString());
@@ -64,7 +89,7 @@
 {
 	assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe);
 	BHMsg msg(InitMsg(sub_unsub));
-	msg.add_route()->set_mq_id(&client, sizeof(client));
+	AddRoute(msg, client);
 	DataSub subs;
 	for (auto &t : topics) {
 		subs.add_topics(t);
@@ -87,14 +112,23 @@
 	return msg;
 }
 
-BHMsg MakeQueryTopic(const std::string &topic)
+BHMsg MakeQueryTopic(const MQId &client, const std::string &topic)
 {
-	BHMsg msg(InitMsg(kMsgTypeQueryTopic));
-	DataQueryTopic query;
+	BHMsg msg(InitMsg(kMsgTypeProcQueryTopic));
+	AddRoute(msg, client);
+	DataProcQueryTopic query;
 	query.set_topic(topic);
 	msg.set_body(query.SerializeAsString());
 	return msg;
 }
+BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid)
+{
+	BHMsg msg(InitMsg(kMsgTypeProcQueryTopicReply, msgid));
+	DataProcQueryTopicReply reply;
+	reply.mutable_address()->set_mq_id(mqid);
+	msg.set_body(reply.SerializeAsString());
+	return msg;
+}
 
 void *Pack(SharedMemory &shm, const BHMsg &msg)
 {
diff --git a/src/msg.h b/src/msg.h
index ea1e636..8c345fd 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -59,10 +59,12 @@
 	int num_ = 1;
 };
 
-BHMsg MakeQueryTopic(const std::string &topic);
-BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size);
+BHMsg MakeQueryTopic(const MQId &client, const std::string &topic);
+BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid);
 BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size);
-BHMsg MakeReply(const void *data, const size_t size);
+BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size);
+BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics);
+BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info);
 BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics);
 BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics);
 BHMsg MakePub(const std::string &topic, const void *data, const size_t size);
diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index cfc77ab..4449c31 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -30,7 +30,7 @@
 			return false;
 		}
 		DEFER1(imsg.Release(shm()));
-		return ShmMsgQueue::Send(shm(), kBHBusQueueId, imsg, timeout_ms);
+		return ShmMsgQueue::Send(shm(), kBHTopicBus, imsg, timeout_ms);
 	} catch (...) {
 		return false;
 	}
@@ -39,7 +39,7 @@
 bool SocketSubscribe::Subscribe(const std::vector<std::string> &topics, const int timeout_ms)
 {
 	try {
-		return mq().Send(kBHBusQueueId, MakeSub(mq().Id(), topics), timeout_ms);
+		return mq().Send(kBHTopicBus, MakeSub(mq().Id(), topics), timeout_ms);
 	} catch (...) {
 		return false;
 	}
diff --git a/src/pubsub.h b/src/pubsub.h
index cad9f61..ac5a9d2 100644
--- a/src/pubsub.h
+++ b/src/pubsub.h
@@ -50,9 +50,11 @@
 	    Socket(shm, 64) {}
 	SocketSubscribe() :
 	    SocketSubscribe(BHomeShm()) {}
+	~SocketSubscribe() { Stop(); }
 
 	typedef std::function<void(const std::string &topic, const std::string &data)> TopicDataCB;
 	bool StartRecv(const TopicDataCB &tdcb, int nworker = 2);
+	bool Stop() { return Socket::Stop(); }
 	bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
 	bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
 };
diff --git a/src/pubsub_center.cpp b/src/pubsub_center.cpp
index 33c16be..afd07bf 100644
--- a/src/pubsub_center.cpp
+++ b/src/pubsub_center.cpp
@@ -17,9 +17,7 @@
  */
 #include "pubsub_center.h"
 #include "bh_util.h"
-
-PubSubCenter::PubSubCenter(SharedMemory &shm) :
-    socket_(shm) {}
+using namespace bhome_shm;
 
 bool PubSubCenter::Start(const int nworker)
 {
diff --git a/src/pubsub_center.h b/src/pubsub_center.h
index 866216e..b752216 100644
--- a/src/pubsub_center.h
+++ b/src/pubsub_center.h
@@ -23,7 +23,6 @@
 #include <mutex>
 #include <set>
 #include <unordered_map>
-using namespace bhome_shm;
 
 // publish/subcribe manager.
 class PubSubCenter
@@ -31,18 +30,19 @@
 	class SocketBus : public ShmSocket
 	{
 	public:
-		SocketBus(SharedMemory &shm) :
-		    ShmSocket(shm, &kBHBusQueueId, 1000) {}
+		SocketBus(ShmSocket::Shm &shm) :
+		    ShmSocket(shm, &kBHTopicBus, 1000) {}
 		using ShmSocket::shm;
 	};
 	SocketBus socket_;
+	ShmSocket::Shm &shm() { return socket_.shm(); }
 	std::mutex mutex_;
 	typedef std::set<MQId> Clients;
 	std::unordered_map<std::string, Clients> records_;
-	ShmSocket::Shm &shm() { return socket_.shm(); }
 
 public:
-	PubSubCenter(SharedMemory &shm);
+	PubSubCenter(ShmSocket::Shm &shm) :
+	    socket_(shm) {}
 	PubSubCenter() :
 	    PubSubCenter(BHomeShm()) {}
 	~PubSubCenter() { Stop(); }
diff --git a/src/reqrep.cpp b/src/reqrep.cpp
index e1636fd..bed6496 100644
--- a/src/reqrep.cpp
+++ b/src/reqrep.cpp
@@ -16,6 +16,7 @@
  * =====================================================================================
  */
 #include "reqrep.h"
+#include "bh_util.h"
 #include "msg.h"
 #include <chrono>
 #include <condition_variable>
@@ -73,30 +74,33 @@
 		BHAddress addr;
 		if (QueryRPCTopic(topic, addr, timeout_ms)) {
 			return Call(addr.mq_id().data());
+		} else {
+			return false;
 		}
 	} catch (...) {
 		return false;
 	}
 }
 
-bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out)
+bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms)
 {
 	try {
 		BHAddress addr;
 		if (QueryRPCTopic(topic, addr, timeout_ms)) {
-			const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
+			const BHMsg &req(MakeRequest(mq().Id(), topic, data, size));
 			BHMsg reply;
-			if (SyncSendAndRecv(addr.mq_id().data(), &msg, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
+			if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
 				DataReply dr;
-				if (dr.ParseFromString(msg.body())) {
+				if (dr.ParseFromString(reply.body())) {
 					dr.mutable_data()->swap(out);
 					return true;
 				}
 			}
+		} else {
 		}
 	} catch (...) {
-		return false;
 	}
+	return false;
 }
 
 bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb)
@@ -132,11 +136,13 @@
 			if (!st->canceled) {
 				static_cast<BHMsg *>(result)->Swap(&msg);
 				st->cv.notify_one();
-			} // else result is no longer valid.
+			} else {
+			}
 		};
 
 		std::unique_lock<std::mutex> lk(st->mutex);
-		if (AsyncSend(remote, msg, timeout_ms, OnRecv) && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
+		bool sendok = AsyncSend(remote, msg, timeout_ms, OnRecv);
+		if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
 			return true;
 		} else {
 			st->canceled = true;
@@ -149,16 +155,100 @@
 
 bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
 {
+	if (tmp_cache_.first == topic) {
+		addr = tmp_cache_.second;
+		return true;
+	}
+
 	BHMsg result;
-	const BHMsg &msg = MakeQueryTopic(topic);
-	if (SyncSendAndRecv(&kBHTopicRPCId, &msg, &result, timeout_ms)) {
-		if (result.type() == kMsgTypeQueryTopicReply) {
-			DataQueryTopicReply reply;
+	const BHMsg &msg = MakeQueryTopic(mq().Id(), topic);
+	if (SyncSendAndRecv(&kBHTopicReqRepCenter, &msg, &result, timeout_ms)) {
+		if (result.type() == kMsgTypeProcQueryTopicReply) {
+			DataProcQueryTopicReply reply;
 			if (reply.ParseFromString(result.body())) {
 				addr = reply.address();
+				tmp_cache_.first = topic;
+				tmp_cache_.second = addr;
 				return !addr.mq_id().empty();
 			}
+		}
+	} else {
+	}
+	return false;
+}
+
+// reply socket
+namespace
+{
+struct SrcInfo {
+	std::vector<BHAddress> route;
+	std::string msg_id;
+};
+
+} // namespace
+
+bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
+{
+	//TODO check reply?
+	return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
+}
+bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
+{
+	return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
+}
+bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
+{
+	auto onRecv = [this, rcb](BHMsg &msg) {
+		if (msg.type() == kMsgTypeRequest && msg.route_size() > 0) {
+			DataRequest req;
+			if (req.ParseFromString(msg.body())) {
+				std::string out;
+				if (rcb(req.topic(), req.data(), out)) {
+					BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size()));
+					for (int i = 0; i < msg.route_size() - 1; ++i) {
+						msg.add_route()->Swap(msg.mutable_route(i));
+					}
+					SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 100);
+				}
+			}
+		} else {
+			// ignored, or dropped
+		}
+	};
+
+	return rcb && Start(onRecv, nworker);
+}
+
+bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
+{
+	BHMsg msg;
+	if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequest) {
+		DataRequest request;
+		if (request.ParseFromString(msg.body())) {
+			request.mutable_topic()->swap(topic);
+			request.mutable_data()->swap(data);
+			SrcInfo *p = new SrcInfo;
+			p->route.assign(msg.route().begin(), msg.route().end());
+			p->msg_id = msg.msg_id();
+			src_info = p;
+			return true;
 		}
 	}
 	return false;
 }
+
+bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms)
+{
+	SrcInfo *p = static_cast<SrcInfo *>(src_info);
+	DEFER1(delete p);
+	if (!p || p->route.empty()) {
+		return false;
+	}
+
+	BHMsg msg(MakeReply(p->msg_id, data.data(), data.size()));
+	for (unsigned i = 0; i < p->route.size() - 1; ++i) {
+		msg.add_route()->Swap(&p->route[i]);
+	}
+
+	return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms);
+}
\ No newline at end of file
diff --git a/src/reqrep.h b/src/reqrep.h
index 02cc86f..2971403 100644
--- a/src/reqrep.h
+++ b/src/reqrep.h
@@ -19,9 +19,12 @@
 #define REQREP_ACEH09NK
 
 #include "defs.h"
+#include "msg.h"
 #include "socket.h"
 #include <functional>
 #include <unordered_map>
+
+using bhome::msg::ProcInfo;
 
 class SocketRequest : private ShmSocket
 {
@@ -32,19 +35,21 @@
 	    Socket(shm, 64) { StartWorker(); }
 	SocketRequest() :
 	    SocketRequest(BHomeShm()) {}
+	~SocketRequest() { Stop(); }
 
 	typedef std::function<void(const std::string &data)> RequestResultCB;
 	bool StartWorker(const RequestResultCB &rrcb, int nworker = 2);
 	bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); }
+	bool Stop() { return Socket::Stop(); }
 	bool AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
 	bool AsyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
 	{
 		return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
 	}
-	bool SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out);
-	bool SyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, std::string &out)
+	bool SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
+	bool SyncRequest(const std::string &topic, const std::string &data, std::string &out, const int timeout_ms)
 	{
-		return SyncRequest(topic, data.data(), data.size(), timeout_ms, out);
+		return SyncRequest(topic, data.data(), data.size(), out, timeout_ms);
 	}
 
 private:
@@ -52,6 +57,30 @@
 	bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms);
 	bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
 	std::unordered_map<std::string, RecvCB> async_cbs_;
+
+	std::pair<std::string, bhome::msg::BHAddress> tmp_cache_;
+};
+
+class SocketReply : private ShmSocket
+{
+	typedef ShmSocket Socket;
+
+public:
+	SocketReply(Socket::Shm &shm) :
+	    Socket(shm, 64) {}
+	SocketReply() :
+	    SocketReply(BHomeShm()) {}
+	~SocketReply() { Stop(); }
+
+	typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
+	bool StartWorker(const OnRequest &rcb, int nworker = 2);
+	bool Stop() { return Socket::Stop(); }
+	bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
+	bool SendReply(void *src_info, const std::string &data, const int timeout_ms);
+	bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms);
+	bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms);
+
+private:
 };
 
 #endif // end of include guard: REQREP_ACEH09NK
diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp
new file mode 100644
index 0000000..5f1873e
--- /dev/null
+++ b/src/reqrep_center.cpp
@@ -0,0 +1,121 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  reqrep_center.cpp
+ *
+ *    Description:  topic request/reply center
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�01鏃� 14鏃�08鍒�50绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), 
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#include "reqrep_center.h"
+#include "bh_util.h"
+using namespace bhome_shm;
+
+struct A {
+	void F(int){};
+};
+
+namespace
+{
+inline uint64_t Now()
+{
+	time_t t;
+	return time(&t);
+}
+
+} // namespace
+bool ReqRepCenter::Start(const int nworker)
+{
+	auto onRecv = [&](BHMsg &msg) {
+#ifndef NDEBUG
+		static std::atomic<time_t> last(0);
+		time_t now = 0;
+		time(&now);
+		if (last.exchange(now) < now) {
+			printf("bus queue size: %ld\n", socket_.Pending());
+		}
+#endif
+		if (msg.route_size() == 0) {
+			return;
+		}
+		auto &src_mq = msg.route(0).mq_id();
+
+		auto OnRegister = [&]() {
+			DataProcRegister reg;
+			if (!reg.ParseFromString(msg.body())) {
+				return;
+			}
+			ProcInfo pi;
+			pi.server_mqid_ = src_mq;
+			pi.proc_id_ = reg.proc().name();
+			pi.ext_info_ = reg.proc().info();
+			pi.timestamp_ = Now();
+
+			std::lock_guard<std::mutex> lock(mutex_);
+			for (auto &t : reg.topics()) {
+				topic_mq_[t] = pi.server_mqid_;
+			}
+			procs_[pi.proc_id_] = pi;
+		};
+
+		auto OnHeartbeat = [&]() {
+			DataProcHeartbeat hb;
+			if (!hb.ParseFromString(msg.body())) {
+				return;
+			}
+
+			std::lock_guard<std::mutex> lock(mutex_);
+			auto pos = procs_.find(hb.proc().name());
+			if (pos != procs_.end() && pos->second.server_mqid_ == src_mq) { // both name and mq should be the same.
+				pos->second.timestamp_ = Now();
+				pos->second.ext_info_ = hb.proc().info();
+			}
+		};
+
+		auto OnQueryTopic = [&]() {
+			DataProcQueryTopic query;
+			if (!query.ParseFromString(msg.body())) {
+				return;
+			}
+
+			std::string dest;
+			auto FindDest = [&]() {
+				std::lock_guard<std::mutex> lock(mutex_);
+				auto pos = topic_mq_.find(query.topic());
+				if (pos != topic_mq_.end()) {
+					dest = pos->second;
+					return true;
+				} else {
+					return false;
+				}
+			};
+			if (FindDest()) {
+				MQId remote;
+				memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote));
+				MsgI imsg;
+				if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
+				if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) {
+					imsg.Release(shm());
+				}
+			}
+		};
+
+		switch (msg.type()) {
+		case kMsgTypeProcRegisterTopics: OnRegister(); break;
+		case kMsgTypeProcHeartbeat: OnHeartbeat(); break;
+		case kMsgTypeProcQueryTopic: OnQueryTopic(); break;
+		default: break;
+		}
+	};
+
+	const int kMaxWorker = 16;
+	return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
+}
\ No newline at end of file
diff --git a/src/reqrep_center.h b/src/reqrep_center.h
new file mode 100644
index 0000000..2ca7295
--- /dev/null
+++ b/src/reqrep_center.h
@@ -0,0 +1,61 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  reqrep_center.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�04鏈�01鏃� 14鏃�09鍒�13绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), 
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+#ifndef REQREP_CENTER_US3RBM60
+#define REQREP_CENTER_US3RBM60
+
+#include "defs.h"
+#include "socket.h"
+#include <chrono>
+#include <mutex>
+#include <set>
+
+class ReqRepCenter
+{
+	class Socket : public ShmSocket
+	{
+	public:
+		Socket(ShmSocket::Shm &shm) :
+		    ShmSocket(shm, &kBHTopicReqRepCenter, 1000) {}
+		using ShmSocket::shm;
+	};
+	Socket socket_;
+	ShmSocket::Shm &shm() { return socket_.shm(); }
+	struct ProcInfo {
+		std::string proc_id_; // unique name
+		std::string server_mqid_;
+		std::string ext_info_; // maybe json.
+		uint64_t timestamp_ = 0;
+	};
+
+	typedef std::string Dests;
+
+	std::mutex mutex_;
+	std::unordered_map<std::string, Dests> topic_mq_;
+	std::unordered_map<std::string, ProcInfo> procs_;
+
+public:
+	ReqRepCenter(ShmSocket::Shm &shm) :
+	    socket_(shm) {}
+	ReqRepCenter() :
+	    ReqRepCenter(BHomeShm()) {}
+	~ReqRepCenter() { Stop(); }
+	bool Start(const int nworker = 2);
+	bool Stop() { return socket_.Stop(); }
+};
+
+#endif // end of include guard: REQREP_CENTER_US3RBM60
diff --git a/src/socket.cpp b/src/socket.cpp
index 4c2fc6b..b9519be 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -46,7 +46,7 @@
 
 ShmSocket::~ShmSocket()
 {
-	Stop();
+	Stop(); //TODO should stop in sub class, incase thread access sub class data.
 }
 
 bool ShmSocket::StartRaw(const RecvRawCB &onData, int nworker)
@@ -102,12 +102,7 @@
 
 bool ShmSocket::SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms)
 {
-	std::lock_guard<std::mutex> lock(mutex_);
-	if (!mq_ || RunningNoLock()) {
-		return false;
-	} else {
-		return mq_->Send(*static_cast<const MQId *>(id), msg, timeout_ms);
-	}
+	return mq_->Send(*static_cast<const MQId *>(id), msg, timeout_ms);
 }
 
 bool ShmSocket::SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms)
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index 35465bb..dc64cc0 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -24,9 +24,9 @@
 {
 	const std::string shm_name("ShmSpeed");
 	ShmRemover auto_remove(shm_name);
-	const int mem_size       = 1024 * 1024 * 50;
-	MQId id                  = boost::uuids::random_generator()();
-	const int timeout        = 100;
+	const int mem_size = 1024 * 1024 * 50;
+	MQId id = boost::uuids::random_generator()();
+	const int timeout = 100;
 	const uint32_t data_size = 4000;
 
 	auto Writer = [&](int writer_id, uint64_t n) {
@@ -35,7 +35,7 @@
 		std::string str(data_size, 'a');
 		MsgI msg;
 		DEFER1(msg.Release(shm););
-		msg.MakeRC(shm, MakeRequest(mq.Id(), str.data(), str.size()));
+		msg.MakeRC(shm, MakeRequest(mq.Id(), "topic", str.data(), str.size()));
 		for (uint64_t i = 0; i < n; ++i) {
 			// mq.Send(id, str.data(), str.size(), timeout);
 			mq.Send(id, msg, timeout);
@@ -70,7 +70,7 @@
 	auto Test = [&](auto &www, auto &rrr, bool isfork) {
 		for (auto nreader : nreaders) {
 			for (auto nwriter : nwriters) {
-				const uint64_t nmsg      = 1000 * 1000 * 10 / nwriter;
+				const uint64_t nmsg = 1000 * 1000 * 10 / nwriter;
 				const uint64_t total_msg = nmsg * nwriter;
 				std::atomic<bool> run(true);
 				std::this_thread::sleep_for(10ms);
@@ -104,26 +104,26 @@
 	run.store(false);
 }
 
-// Request Reply Test
-BOOST_AUTO_TEST_CASE(RRTest)
+// Send Recv Test
+BOOST_AUTO_TEST_CASE(SRTest)
 {
-	const std::string shm_name("ShmReqRep");
+	const std::string shm_name("ShmSendRecv");
 	ShmRemover auto_remove(shm_name);
-	const int qlen          = 64;
+	const int qlen = 64;
 	const size_t msg_length = 1000;
 	std::string msg_content(msg_length, 'a');
 	msg_content[20] = '\0';
 
 	SharedMemory shm(shm_name, 1024 * 1024 * 50);
-	auto Avail      = [&]() { return shm.get_free_memory(); };
+	auto Avail = [&]() { return shm.get_free_memory(); };
 	auto init_avail = Avail();
 	ShmMsgQueue srv(shm, qlen);
 	ShmMsgQueue cli(shm, qlen);
 
 	MsgI request_rc;
-	request_rc.MakeRC(shm, MakeRequest(cli.Id(), msg_content.data(), msg_content.size()));
+	request_rc.MakeRC(shm, MakeRequest(cli.Id(), "topic", msg_content.data(), msg_content.size()));
 	MsgI reply_rc;
-	reply_rc.MakeRC(shm, MakeReply(msg_content.data(), msg_content.size()));
+	reply_rc.MakeRC(shm, MakeReply("fakemsgid", msg_content.data(), msg_content.size()));
 
 	std::atomic<uint64_t> count(0);
 
@@ -133,7 +133,7 @@
 	auto Client = [&](int cli_id, int nmsg) {
 		for (int i = 0; i < nmsg; ++i) {
 			auto Req = [&]() {
-				return cli.Send(srv.Id(), MakeRequest(cli.Id(), msg_content.data(), msg_content.size()), 100);
+				return cli.Send(srv.Id(), MakeRequest(cli.Id(), "topic", msg_content.data(), msg_content.size()), 100);
 			};
 			auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); };
 
@@ -165,7 +165,7 @@
 				MQId src_id;
 				memcpy(&src_id, mqid.data(), sizeof(src_id));
 				auto Reply = [&]() {
-					return srv.Send(src_id, MakeReply(msg_content.data(), msg_content.size()), 100);
+					return srv.Send(src_id, MakeReply(req.msg_id(), msg_content.data(), msg_content.size()), 100);
 				};
 				auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); };
 
@@ -180,7 +180,7 @@
 
 	ThreadManager clients, servers;
 	for (int i = 0; i < qlen; ++i) { servers.Launch(Server); }
-	int ncli      = 100 * 1;
+	int ncli = 100 * 1;
 	uint64_t nmsg = 100 * 100 * 2;
 	printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
 	for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
diff --git a/utest/utest.cpp b/utest/utest.cpp
index b95e646..54c6d6f 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -1,6 +1,8 @@
 #include "defs.h"
 #include "pubsub.h"
 #include "pubsub_center.h"
+#include "reqrep.h"
+#include "reqrep_center.h"
 #include "socket.h"
 #include "util.h"
 #include <atomic>
@@ -150,6 +152,62 @@
 	bus.Stop();
 }
 
+BOOST_AUTO_TEST_CASE(ReqRepTest)
+{
+	const std::string shm_name("ShmReqRep");
+	ShmRemover auto_remove(shm_name);
+	SharedMemory shm(shm_name, 1024 * 1024 * 50);
+
+	auto Avail = [&]() { return shm.get_free_memory(); };
+	auto init_avail = Avail();
+	int *flag = shm.find_or_construct<int>("flag")(123);
+	printf("flag = %d\n", *flag);
+	++*flag;
+
+	ReqRepCenter center(shm);
+	center.Start(2);
+	std::atomic<bool> run(true);
+
+	auto Client = [&](const std::string &topic, const int nreq) {
+		SocketRequest client(shm);
+		std::string reply;
+		boost::timer::auto_cpu_timer timer;
+		for (int i = 0; i < nreq; ++i) {
+			if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) {
+				printf("client request failed\n");
+			}
+		}
+		printf("request %s %d done ", topic.c_str(), nreq);
+	};
+	auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
+		SocketReply server(shm);
+		ProcInfo info;
+		info.set_name(name);
+		info.set_info(name);
+		if (!server.Register(info, topics, 100)) {
+			printf("register failed\n");
+		}
+		auto onData = [](const std::string &topic, const std::string &data, std::string &reply) {
+			reply = topic + ':' + data;
+			return true;
+		};
+		server.StartWorker(onData);
+		while (run) {
+			std::this_thread::yield();
+		}
+	};
+	ThreadManager clients, servers;
+	std::vector<std::string> topics = {"topic1", "topic2"};
+	servers.Launch(Server, "server", topics);
+	std::this_thread::sleep_for(100ms);
+	for (auto &t : topics) {
+		clients.Launch(Client, t, 1000 * 100);
+	}
+	clients.WaitAll();
+	run = false;
+	servers.WaitAll();
+}
+
 inline int MyMin(int a, int b)
 {
 	printf("MyMin\n");

--
Gitblit v1.8.0