From 3e9f5b869dd32441fdd3d77091cb33ef4301f244 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 06 四月 2021 20:26:20 +0800
Subject: [PATCH] use BHCenter.
---
src/topic_reply.cpp | 4 +-
src/pubsub_center.h | 2
src/reqrep_center.h | 2
src/center.h | 16 +++++++-
src/defs.h | 6 +-
src/pubsub.cpp | 4 +-
utest/utest.cpp | 5 +-
src/topic_request.cpp | 2
src/center.cpp | 36 ++++++++++++++++-
src/defs.cpp | 6 +-
10 files changed, 63 insertions(+), 20 deletions(-)
diff --git a/src/center.cpp b/src/center.cpp
index d0e61a2..a3897fb 100644
--- a/src/center.cpp
+++ b/src/center.cpp
@@ -37,8 +37,24 @@
return shm;
}
-BHCenter::BHCenter(Socket::Shm &shm) :
- socket_(shm, &BHUniCenter(), 1000) {}
+BHCenter::CenterRecords &BHCenter::Centers()
+{
+ static CenterRecords rec;
+ return rec;
+}
+bool BHCenter::Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len)
+{
+ CenterRecords()[name] = CenterInfo{name, handler, mqid, mq_len};
+}
+
+BHCenter::BHCenter(Socket::Shm &shm)
+{
+ sockets_["center"] = std::make_shared<ShmSocket>(shm, &BHTopicCenterAddress(), 1000);
+ sockets_["bus"] = std::make_shared<ShmSocket>(shm, &BHTopicBusAddress(), 1000);
+ for (auto &kv : Centers()) {
+ sockets_[kv.first] = std::make_shared<ShmSocket>(shm, kv.second.mqid_.data(), kv.second.mq_len_);
+ }
+}
BHCenter::BHCenter() :
BHCenter(BHomeShm()) {}
@@ -47,6 +63,20 @@
{
auto onCenter = MakeReqRepCenter();
auto onBus = MakeBusCenter();
+ sockets_["center"]->Start(onCenter);
+ sockets_["bus"]->Start(onBus);
- socket_.Start(Join(onCenter, onBus));
+ for (auto &kv : Centers()) {
+ sockets_[kv.first]->Start(kv.second.handler_);
+ }
+ return true;
+ // socket_.Start(Join(onCenter, onBus));
+}
+
+bool BHCenter::Stop()
+{
+ for (auto &kv : sockets_) {
+ kv.second->Stop();
+ }
+ return true;
}
\ No newline at end of file
diff --git a/src/center.h b/src/center.h
index f0a177c..02ec8f4 100644
--- a/src/center.h
+++ b/src/center.h
@@ -20,6 +20,8 @@
#include "socket.h"
#include <functional>
+#include <map>
+#include <memory>
class BHCenter
{
@@ -27,15 +29,25 @@
public:
typedef std::function<bool(ShmSocket &socket, bhome_msg::MsgI &imsg, bhome::msg::BHMsg &msg)> MsgHandler;
+ static bool Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len);
BHCenter(Socket::Shm &shm);
BHCenter();
~BHCenter() { Stop(); }
bool Start();
- bool Stop() { return socket_.Stop(); }
+ bool Stop();
private:
- ShmSocket socket_;
+ struct CenterInfo {
+ std::string name_;
+ MsgHandler handler_;
+ std::string mqid_;
+ int mq_len_ = 0;
+ };
+ typedef std::map<std::string, CenterInfo> CenterRecords;
+ static CenterRecords &Centers();
+
+ std::map<std::string, std::shared_ptr<ShmSocket>> sockets_;
};
#endif // end of include guard: CENTER_TM9OUQTG
diff --git a/src/defs.cpp b/src/defs.cpp
index 4051196..cab4fc7 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -25,6 +25,6 @@
} // namespace
-const MQId &BHTopicBus() { return kBHTopicBus; }
-const MQId &BHTopicReqRepCenter() { return kBHTopicReqRepCenter; }
-const MQId &BHUniCenter() { return kBHUniCenter; }
+const MQId &BHTopicBusAddress() { return kBHTopicBus; }
+const MQId &BHTopicCenterAddress() { return kBHTopicReqRepCenter; }
+const MQId &BHUniCenterAddress() { return kBHUniCenter; }
diff --git a/src/defs.h b/src/defs.h
index 91d8cf3..d50e380 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -25,9 +25,9 @@
typedef boost::uuids::uuid MQId;
-const MQId &BHTopicBus();
-const MQId &BHTopicReqRepCenter();
-const MQId &BHUniCenter();
+const MQId &BHTopicBusAddress();
+const MQId &BHTopicCenterAddress();
+const MQId &BHUniCenterAddress();
const int kBHCenterPort = 24287;
const char kTopicSep = '.';
diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index 520c006..0266c86 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -30,7 +30,7 @@
return false;
}
DEFER1(imsg.Release(shm()));
- return ShmMsgQueue::Send(shm(), BHTopicBus(), imsg, timeout_ms);
+ return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms);
} catch (...) {
return false;
}
@@ -39,7 +39,7 @@
bool SocketSubscribe::Subscribe(const std::vector<Topic> &topics, const int timeout_ms)
{
try {
- return mq().Send(BHTopicBus(), MakeSub(mq().Id(), topics), timeout_ms);
+ return mq().Send(BHTopicBusAddress(), MakeSub(mq().Id(), topics), timeout_ms);
} catch (...) {
return false;
}
diff --git a/src/pubsub_center.h b/src/pubsub_center.h
index 0b00c41..f81fa0e 100644
--- a/src/pubsub_center.h
+++ b/src/pubsub_center.h
@@ -34,7 +34,7 @@
public:
PubSubCenter(ShmSocket::Shm &shm) :
- socket_(shm, &BHTopicBus(), 1000) {}
+ socket_(shm, &BHTopicBusAddress(), 1000) {}
PubSubCenter() :
PubSubCenter(BHomeShm()) {}
~PubSubCenter() { Stop(); }
diff --git a/src/reqrep_center.h b/src/reqrep_center.h
index 069ed11..bdcdcad 100644
--- a/src/reqrep_center.h
+++ b/src/reqrep_center.h
@@ -29,7 +29,7 @@
public:
ReqRepCenter(ShmSocket::Shm &shm) :
- socket_(shm, &BHTopicReqRepCenter(), 1000) {}
+ socket_(shm, &BHTopicCenterAddress(), 1000) {}
ReqRepCenter() :
ReqRepCenter(BHomeShm()) {}
~ReqRepCenter() { Stop(); }
diff --git a/src/topic_reply.cpp b/src/topic_reply.cpp
index aaed407..2ab75e6 100644
--- a/src/topic_reply.cpp
+++ b/src/topic_reply.cpp
@@ -70,11 +70,11 @@
bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
{
//TODO check reply?
- return SyncSend(&BHTopicReqRepCenter(), MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
+ return SyncSend(&BHTopicCenterAddress(), MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
}
bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
{
- return SyncSend(&BHTopicReqRepCenter(), MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
+ return SyncSend(&BHTopicCenterAddress(), MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
}
bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
{
diff --git a/src/topic_request.cpp b/src/topic_request.cpp
index 322ed64..382ce21 100644
--- a/src/topic_request.cpp
+++ b/src/topic_request.cpp
@@ -191,7 +191,7 @@
BHMsg result;
const BHMsg &msg = MakeQueryTopic(mq().Id(), topic);
- if (SyncSendAndRecv(&BHTopicReqRepCenter(), &msg, &result, timeout_ms)) {
+ if (SyncSendAndRecv(&BHTopicCenterAddress(), &msg, &result, timeout_ms)) {
if (result.type() == kMsgTypeQueryTopicReply) {
MsgQueryTopicReply reply;
if (reply.ParseFromString(result.body())) {
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 8f2a7f5..092455f 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -1,3 +1,4 @@
+#include "center.h"
#include "defs.h"
#include "pubsub.h"
#include "pubsub_center.h"
@@ -176,8 +177,8 @@
printf("flag = %d\n", *flag);
++*flag;
- ReqRepCenter center(shm);
- center.Start(2);
+ BHCenter center(shm);
+ center.Start();
std::atomic<bool> run(true);
auto Client = [&](const std::string &topic, const int nreq) {
--
Gitblit v1.8.0