From 3e9f5b869dd32441fdd3d77091cb33ef4301f244 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 06 四月 2021 20:26:20 +0800 Subject: [PATCH] use BHCenter. --- src/topic_reply.cpp | 4 +- src/pubsub_center.h | 2 src/reqrep_center.h | 2 src/center.h | 16 +++++++- src/defs.h | 6 +- src/pubsub.cpp | 4 +- utest/utest.cpp | 5 +- src/topic_request.cpp | 2 src/center.cpp | 36 ++++++++++++++++- src/defs.cpp | 6 +- 10 files changed, 63 insertions(+), 20 deletions(-) diff --git a/src/center.cpp b/src/center.cpp index d0e61a2..a3897fb 100644 --- a/src/center.cpp +++ b/src/center.cpp @@ -37,8 +37,24 @@ return shm; } -BHCenter::BHCenter(Socket::Shm &shm) : - socket_(shm, &BHUniCenter(), 1000) {} +BHCenter::CenterRecords &BHCenter::Centers() +{ + static CenterRecords rec; + return rec; +} +bool BHCenter::Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len) +{ + CenterRecords()[name] = CenterInfo{name, handler, mqid, mq_len}; +} + +BHCenter::BHCenter(Socket::Shm &shm) +{ + sockets_["center"] = std::make_shared<ShmSocket>(shm, &BHTopicCenterAddress(), 1000); + sockets_["bus"] = std::make_shared<ShmSocket>(shm, &BHTopicBusAddress(), 1000); + for (auto &kv : Centers()) { + sockets_[kv.first] = std::make_shared<ShmSocket>(shm, kv.second.mqid_.data(), kv.second.mq_len_); + } +} BHCenter::BHCenter() : BHCenter(BHomeShm()) {} @@ -47,6 +63,20 @@ { auto onCenter = MakeReqRepCenter(); auto onBus = MakeBusCenter(); + sockets_["center"]->Start(onCenter); + sockets_["bus"]->Start(onBus); - socket_.Start(Join(onCenter, onBus)); + for (auto &kv : Centers()) { + sockets_[kv.first]->Start(kv.second.handler_); + } + return true; + // socket_.Start(Join(onCenter, onBus)); +} + +bool BHCenter::Stop() +{ + for (auto &kv : sockets_) { + kv.second->Stop(); + } + return true; } \ No newline at end of file diff --git a/src/center.h b/src/center.h index f0a177c..02ec8f4 100644 --- a/src/center.h +++ b/src/center.h @@ -20,6 +20,8 @@ #include "socket.h" #include <functional> +#include <map> +#include <memory> class BHCenter { @@ -27,15 +29,25 @@ public: typedef std::function<bool(ShmSocket &socket, bhome_msg::MsgI &imsg, bhome::msg::BHMsg &msg)> MsgHandler; + static bool Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len); BHCenter(Socket::Shm &shm); BHCenter(); ~BHCenter() { Stop(); } bool Start(); - bool Stop() { return socket_.Stop(); } + bool Stop(); private: - ShmSocket socket_; + struct CenterInfo { + std::string name_; + MsgHandler handler_; + std::string mqid_; + int mq_len_ = 0; + }; + typedef std::map<std::string, CenterInfo> CenterRecords; + static CenterRecords &Centers(); + + std::map<std::string, std::shared_ptr<ShmSocket>> sockets_; }; #endif // end of include guard: CENTER_TM9OUQTG diff --git a/src/defs.cpp b/src/defs.cpp index 4051196..cab4fc7 100644 --- a/src/defs.cpp +++ b/src/defs.cpp @@ -25,6 +25,6 @@ } // namespace -const MQId &BHTopicBus() { return kBHTopicBus; } -const MQId &BHTopicReqRepCenter() { return kBHTopicReqRepCenter; } -const MQId &BHUniCenter() { return kBHUniCenter; } +const MQId &BHTopicBusAddress() { return kBHTopicBus; } +const MQId &BHTopicCenterAddress() { return kBHTopicReqRepCenter; } +const MQId &BHUniCenterAddress() { return kBHUniCenter; } diff --git a/src/defs.h b/src/defs.h index 91d8cf3..d50e380 100644 --- a/src/defs.h +++ b/src/defs.h @@ -25,9 +25,9 @@ typedef boost::uuids::uuid MQId; -const MQId &BHTopicBus(); -const MQId &BHTopicReqRepCenter(); -const MQId &BHUniCenter(); +const MQId &BHTopicBusAddress(); +const MQId &BHTopicCenterAddress(); +const MQId &BHUniCenterAddress(); const int kBHCenterPort = 24287; const char kTopicSep = '.'; diff --git a/src/pubsub.cpp b/src/pubsub.cpp index 520c006..0266c86 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -30,7 +30,7 @@ return false; } DEFER1(imsg.Release(shm())); - return ShmMsgQueue::Send(shm(), BHTopicBus(), imsg, timeout_ms); + return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms); } catch (...) { return false; } @@ -39,7 +39,7 @@ bool SocketSubscribe::Subscribe(const std::vector<Topic> &topics, const int timeout_ms) { try { - return mq().Send(BHTopicBus(), MakeSub(mq().Id(), topics), timeout_ms); + return mq().Send(BHTopicBusAddress(), MakeSub(mq().Id(), topics), timeout_ms); } catch (...) { return false; } diff --git a/src/pubsub_center.h b/src/pubsub_center.h index 0b00c41..f81fa0e 100644 --- a/src/pubsub_center.h +++ b/src/pubsub_center.h @@ -34,7 +34,7 @@ public: PubSubCenter(ShmSocket::Shm &shm) : - socket_(shm, &BHTopicBus(), 1000) {} + socket_(shm, &BHTopicBusAddress(), 1000) {} PubSubCenter() : PubSubCenter(BHomeShm()) {} ~PubSubCenter() { Stop(); } diff --git a/src/reqrep_center.h b/src/reqrep_center.h index 069ed11..bdcdcad 100644 --- a/src/reqrep_center.h +++ b/src/reqrep_center.h @@ -29,7 +29,7 @@ public: ReqRepCenter(ShmSocket::Shm &shm) : - socket_(shm, &BHTopicReqRepCenter(), 1000) {} + socket_(shm, &BHTopicCenterAddress(), 1000) {} ReqRepCenter() : ReqRepCenter(BHomeShm()) {} ~ReqRepCenter() { Stop(); } diff --git a/src/topic_reply.cpp b/src/topic_reply.cpp index aaed407..2ab75e6 100644 --- a/src/topic_reply.cpp +++ b/src/topic_reply.cpp @@ -70,11 +70,11 @@ bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms) { //TODO check reply? - return SyncSend(&BHTopicReqRepCenter(), MakeRegister(mq().Id(), proc_info, topics), timeout_ms); + return SyncSend(&BHTopicCenterAddress(), MakeRegister(mq().Id(), proc_info, topics), timeout_ms); } bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms) { - return SyncSend(&BHTopicReqRepCenter(), MakeHeartbeat(mq().Id(), proc_info), timeout_ms); + return SyncSend(&BHTopicCenterAddress(), MakeHeartbeat(mq().Id(), proc_info), timeout_ms); } bool SocketReply::StartWorker(const OnRequest &rcb, int nworker) { diff --git a/src/topic_request.cpp b/src/topic_request.cpp index 322ed64..382ce21 100644 --- a/src/topic_request.cpp +++ b/src/topic_request.cpp @@ -191,7 +191,7 @@ BHMsg result; const BHMsg &msg = MakeQueryTopic(mq().Id(), topic); - if (SyncSendAndRecv(&BHTopicReqRepCenter(), &msg, &result, timeout_ms)) { + if (SyncSendAndRecv(&BHTopicCenterAddress(), &msg, &result, timeout_ms)) { if (result.type() == kMsgTypeQueryTopicReply) { MsgQueryTopicReply reply; if (reply.ParseFromString(result.body())) { diff --git a/utest/utest.cpp b/utest/utest.cpp index 8f2a7f5..092455f 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -1,3 +1,4 @@ +#include "center.h" #include "defs.h" #include "pubsub.h" #include "pubsub_center.h" @@ -176,8 +177,8 @@ printf("flag = %d\n", *flag); ++*flag; - ReqRepCenter center(shm); - center.Start(2); + BHCenter center(shm); + center.Start(); std::atomic<bool> run(true); auto Client = [&](const std::string &topic, const int nreq) { -- Gitblit v1.8.0