From 3e9f5b869dd32441fdd3d77091cb33ef4301f244 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 06 四月 2021 20:26:20 +0800
Subject: [PATCH] use BHCenter.

---
 src/topic_reply.cpp   |    4 +-
 src/pubsub_center.h   |    2 
 src/reqrep_center.h   |    2 
 src/center.h          |   16 +++++++-
 src/defs.h            |    6 +-
 src/pubsub.cpp        |    4 +-
 utest/utest.cpp       |    5 +-
 src/topic_request.cpp |    2 
 src/center.cpp        |   36 ++++++++++++++++-
 src/defs.cpp          |    6 +-
 10 files changed, 63 insertions(+), 20 deletions(-)

diff --git a/src/center.cpp b/src/center.cpp
index d0e61a2..a3897fb 100644
--- a/src/center.cpp
+++ b/src/center.cpp
@@ -37,8 +37,24 @@
 	return shm;
 }
 
-BHCenter::BHCenter(Socket::Shm &shm) :
-    socket_(shm, &BHUniCenter(), 1000) {}
+BHCenter::CenterRecords &BHCenter::Centers()
+{
+	static CenterRecords rec;
+	return rec;
+}
+bool BHCenter::Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len)
+{
+	CenterRecords()[name] = CenterInfo{name, handler, mqid, mq_len};
+}
+
+BHCenter::BHCenter(Socket::Shm &shm)
+{
+	sockets_["center"] = std::make_shared<ShmSocket>(shm, &BHTopicCenterAddress(), 1000);
+	sockets_["bus"] = std::make_shared<ShmSocket>(shm, &BHTopicBusAddress(), 1000);
+	for (auto &kv : Centers()) {
+		sockets_[kv.first] = std::make_shared<ShmSocket>(shm, kv.second.mqid_.data(), kv.second.mq_len_);
+	}
+}
 
 BHCenter::BHCenter() :
     BHCenter(BHomeShm()) {}
@@ -47,6 +63,20 @@
 {
 	auto onCenter = MakeReqRepCenter();
 	auto onBus = MakeBusCenter();
+	sockets_["center"]->Start(onCenter);
+	sockets_["bus"]->Start(onBus);
 
-	socket_.Start(Join(onCenter, onBus));
+	for (auto &kv : Centers()) {
+		sockets_[kv.first]->Start(kv.second.handler_);
+	}
+	return true;
+	// socket_.Start(Join(onCenter, onBus));
+}
+
+bool BHCenter::Stop()
+{
+	for (auto &kv : sockets_) {
+		kv.second->Stop();
+	}
+	return true;
 }
\ No newline at end of file
diff --git a/src/center.h b/src/center.h
index f0a177c..02ec8f4 100644
--- a/src/center.h
+++ b/src/center.h
@@ -20,6 +20,8 @@
 
 #include "socket.h"
 #include <functional>
+#include <map>
+#include <memory>
 
 class BHCenter
 {
@@ -27,15 +29,25 @@
 
 public:
 	typedef std::function<bool(ShmSocket &socket, bhome_msg::MsgI &imsg, bhome::msg::BHMsg &msg)> MsgHandler;
+	static bool Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len);
 
 	BHCenter(Socket::Shm &shm);
 	BHCenter();
 	~BHCenter() { Stop(); }
 	bool Start();
-	bool Stop() { return socket_.Stop(); }
+	bool Stop();
 
 private:
-	ShmSocket socket_;
+	struct CenterInfo {
+		std::string name_;
+		MsgHandler handler_;
+		std::string mqid_;
+		int mq_len_ = 0;
+	};
+	typedef std::map<std::string, CenterInfo> CenterRecords;
+	static CenterRecords &Centers();
+
+	std::map<std::string, std::shared_ptr<ShmSocket>> sockets_;
 };
 
 #endif // end of include guard: CENTER_TM9OUQTG
diff --git a/src/defs.cpp b/src/defs.cpp
index 4051196..cab4fc7 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -25,6 +25,6 @@
 
 } // namespace
 
-const MQId &BHTopicBus() { return kBHTopicBus; }
-const MQId &BHTopicReqRepCenter() { return kBHTopicReqRepCenter; }
-const MQId &BHUniCenter() { return kBHUniCenter; }
+const MQId &BHTopicBusAddress() { return kBHTopicBus; }
+const MQId &BHTopicCenterAddress() { return kBHTopicReqRepCenter; }
+const MQId &BHUniCenterAddress() { return kBHUniCenter; }
diff --git a/src/defs.h b/src/defs.h
index 91d8cf3..d50e380 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -25,9 +25,9 @@
 
 typedef boost::uuids::uuid MQId;
 
-const MQId &BHTopicBus();
-const MQId &BHTopicReqRepCenter();
-const MQId &BHUniCenter();
+const MQId &BHTopicBusAddress();
+const MQId &BHTopicCenterAddress();
+const MQId &BHUniCenterAddress();
 
 const int kBHCenterPort = 24287;
 const char kTopicSep = '.';
diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index 520c006..0266c86 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -30,7 +30,7 @@
 			return false;
 		}
 		DEFER1(imsg.Release(shm()));
-		return ShmMsgQueue::Send(shm(), BHTopicBus(), imsg, timeout_ms);
+		return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms);
 	} catch (...) {
 		return false;
 	}
@@ -39,7 +39,7 @@
 bool SocketSubscribe::Subscribe(const std::vector<Topic> &topics, const int timeout_ms)
 {
 	try {
-		return mq().Send(BHTopicBus(), MakeSub(mq().Id(), topics), timeout_ms);
+		return mq().Send(BHTopicBusAddress(), MakeSub(mq().Id(), topics), timeout_ms);
 	} catch (...) {
 		return false;
 	}
diff --git a/src/pubsub_center.h b/src/pubsub_center.h
index 0b00c41..f81fa0e 100644
--- a/src/pubsub_center.h
+++ b/src/pubsub_center.h
@@ -34,7 +34,7 @@
 
 public:
 	PubSubCenter(ShmSocket::Shm &shm) :
-	    socket_(shm, &BHTopicBus(), 1000) {}
+	    socket_(shm, &BHTopicBusAddress(), 1000) {}
 	PubSubCenter() :
 	    PubSubCenter(BHomeShm()) {}
 	~PubSubCenter() { Stop(); }
diff --git a/src/reqrep_center.h b/src/reqrep_center.h
index 069ed11..bdcdcad 100644
--- a/src/reqrep_center.h
+++ b/src/reqrep_center.h
@@ -29,7 +29,7 @@
 
 public:
 	ReqRepCenter(ShmSocket::Shm &shm) :
-	    socket_(shm, &BHTopicReqRepCenter(), 1000) {}
+	    socket_(shm, &BHTopicCenterAddress(), 1000) {}
 	ReqRepCenter() :
 	    ReqRepCenter(BHomeShm()) {}
 	~ReqRepCenter() { Stop(); }
diff --git a/src/topic_reply.cpp b/src/topic_reply.cpp
index aaed407..2ab75e6 100644
--- a/src/topic_reply.cpp
+++ b/src/topic_reply.cpp
@@ -70,11 +70,11 @@
 bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
 {
 	//TODO check reply?
-	return SyncSend(&BHTopicReqRepCenter(), MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
+	return SyncSend(&BHTopicCenterAddress(), MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
 }
 bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
 {
-	return SyncSend(&BHTopicReqRepCenter(), MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
+	return SyncSend(&BHTopicCenterAddress(), MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
 }
 bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
 {
diff --git a/src/topic_request.cpp b/src/topic_request.cpp
index 322ed64..382ce21 100644
--- a/src/topic_request.cpp
+++ b/src/topic_request.cpp
@@ -191,7 +191,7 @@
 
 	BHMsg result;
 	const BHMsg &msg = MakeQueryTopic(mq().Id(), topic);
-	if (SyncSendAndRecv(&BHTopicReqRepCenter(), &msg, &result, timeout_ms)) {
+	if (SyncSendAndRecv(&BHTopicCenterAddress(), &msg, &result, timeout_ms)) {
 		if (result.type() == kMsgTypeQueryTopicReply) {
 			MsgQueryTopicReply reply;
 			if (reply.ParseFromString(result.body())) {
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 8f2a7f5..092455f 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -1,3 +1,4 @@
+#include "center.h"
 #include "defs.h"
 #include "pubsub.h"
 #include "pubsub_center.h"
@@ -176,8 +177,8 @@
 	printf("flag = %d\n", *flag);
 	++*flag;
 
-	ReqRepCenter center(shm);
-	center.Start(2);
+	BHCenter center(shm);
+	center.Start();
 	std::atomic<bool> run(true);
 
 	auto Client = [&](const std::string &topic, const int nreq) {

--
Gitblit v1.8.0