| | |
| | | return shm; |
| | | } |
| | | |
| | | BHCenter::BHCenter(Socket::Shm &shm) : |
| | | socket_(shm, &BHUniCenter(), 1000) {} |
| | | BHCenter::CenterRecords &BHCenter::Centers() |
| | | { |
| | | static CenterRecords rec; |
| | | return rec; |
| | | } |
| | | bool BHCenter::Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len) |
| | | { |
| | | CenterRecords()[name] = CenterInfo{name, handler, mqid, mq_len}; |
| | | } |
| | | |
| | | BHCenter::BHCenter(Socket::Shm &shm) |
| | | { |
| | | sockets_["center"] = std::make_shared<ShmSocket>(shm, &BHTopicCenterAddress(), 1000); |
| | | sockets_["bus"] = std::make_shared<ShmSocket>(shm, &BHTopicBusAddress(), 1000); |
| | | for (auto &kv : Centers()) { |
| | | sockets_[kv.first] = std::make_shared<ShmSocket>(shm, kv.second.mqid_.data(), kv.second.mq_len_); |
| | | } |
| | | } |
| | | |
| | | BHCenter::BHCenter() : |
| | | BHCenter(BHomeShm()) {} |
| | |
| | | { |
| | | auto onCenter = MakeReqRepCenter(); |
| | | auto onBus = MakeBusCenter(); |
| | | sockets_["center"]->Start(onCenter); |
| | | sockets_["bus"]->Start(onBus); |
| | | |
| | | socket_.Start(Join(onCenter, onBus)); |
| | | for (auto &kv : Centers()) { |
| | | sockets_[kv.first]->Start(kv.second.handler_); |
| | | } |
| | | return true; |
| | | // socket_.Start(Join(onCenter, onBus)); |
| | | } |
| | | |
| | | bool BHCenter::Stop() |
| | | { |
| | | for (auto &kv : sockets_) { |
| | | kv.second->Stop(); |
| | | } |
| | | return true; |
| | | } |
| | |
| | | |
| | | #include "socket.h" |
| | | #include <functional> |
| | | #include <map> |
| | | #include <memory> |
| | | |
| | | class BHCenter |
| | | { |
| | |
| | | |
| | | public: |
| | | typedef std::function<bool(ShmSocket &socket, bhome_msg::MsgI &imsg, bhome::msg::BHMsg &msg)> MsgHandler; |
| | | static bool Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len); |
| | | |
| | | BHCenter(Socket::Shm &shm); |
| | | BHCenter(); |
| | | ~BHCenter() { Stop(); } |
| | | bool Start(); |
| | | bool Stop() { return socket_.Stop(); } |
| | | bool Stop(); |
| | | |
| | | private: |
| | | ShmSocket socket_; |
| | | struct CenterInfo { |
| | | std::string name_; |
| | | MsgHandler handler_; |
| | | std::string mqid_; |
| | | int mq_len_ = 0; |
| | | }; |
| | | typedef std::map<std::string, CenterInfo> CenterRecords; |
| | | static CenterRecords &Centers(); |
| | | |
| | | std::map<std::string, std::shared_ptr<ShmSocket>> sockets_; |
| | | }; |
| | | |
| | | #endif // end of include guard: CENTER_TM9OUQTG |
| | |
| | | |
| | | } // namespace |
| | | |
| | | const MQId &BHTopicBus() { return kBHTopicBus; } |
| | | const MQId &BHTopicReqRepCenter() { return kBHTopicReqRepCenter; } |
| | | const MQId &BHUniCenter() { return kBHUniCenter; } |
| | | const MQId &BHTopicBusAddress() { return kBHTopicBus; } |
| | | const MQId &BHTopicCenterAddress() { return kBHTopicReqRepCenter; } |
| | | const MQId &BHUniCenterAddress() { return kBHUniCenter; } |
| | |
| | | |
| | | typedef boost::uuids::uuid MQId; |
| | | |
| | | const MQId &BHTopicBus(); |
| | | const MQId &BHTopicReqRepCenter(); |
| | | const MQId &BHUniCenter(); |
| | | const MQId &BHTopicBusAddress(); |
| | | const MQId &BHTopicCenterAddress(); |
| | | const MQId &BHUniCenterAddress(); |
| | | |
| | | const int kBHCenterPort = 24287; |
| | | const char kTopicSep = '.'; |
| | |
| | | return false; |
| | | } |
| | | DEFER1(imsg.Release(shm())); |
| | | return ShmMsgQueue::Send(shm(), BHTopicBus(), imsg, timeout_ms); |
| | | return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms); |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | |
| | | bool SocketSubscribe::Subscribe(const std::vector<Topic> &topics, const int timeout_ms) |
| | | { |
| | | try { |
| | | return mq().Send(BHTopicBus(), MakeSub(mq().Id(), topics), timeout_ms); |
| | | return mq().Send(BHTopicBusAddress(), MakeSub(mq().Id(), topics), timeout_ms); |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | |
| | | |
| | | public: |
| | | PubSubCenter(ShmSocket::Shm &shm) : |
| | | socket_(shm, &BHTopicBus(), 1000) {} |
| | | socket_(shm, &BHTopicBusAddress(), 1000) {} |
| | | PubSubCenter() : |
| | | PubSubCenter(BHomeShm()) {} |
| | | ~PubSubCenter() { Stop(); } |
| | |
| | | |
| | | public: |
| | | ReqRepCenter(ShmSocket::Shm &shm) : |
| | | socket_(shm, &BHTopicReqRepCenter(), 1000) {} |
| | | socket_(shm, &BHTopicCenterAddress(), 1000) {} |
| | | ReqRepCenter() : |
| | | ReqRepCenter(BHomeShm()) {} |
| | | ~ReqRepCenter() { Stop(); } |
| | |
| | | bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms) |
| | | { |
| | | //TODO check reply? |
| | | return SyncSend(&BHTopicReqRepCenter(), MakeRegister(mq().Id(), proc_info, topics), timeout_ms); |
| | | return SyncSend(&BHTopicCenterAddress(), MakeRegister(mq().Id(), proc_info, topics), timeout_ms); |
| | | } |
| | | bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms) |
| | | { |
| | | return SyncSend(&BHTopicReqRepCenter(), MakeHeartbeat(mq().Id(), proc_info), timeout_ms); |
| | | return SyncSend(&BHTopicCenterAddress(), MakeHeartbeat(mq().Id(), proc_info), timeout_ms); |
| | | } |
| | | bool SocketReply::StartWorker(const OnRequest &rcb, int nworker) |
| | | { |
| | |
| | | |
| | | BHMsg result; |
| | | const BHMsg &msg = MakeQueryTopic(mq().Id(), topic); |
| | | if (SyncSendAndRecv(&BHTopicReqRepCenter(), &msg, &result, timeout_ms)) { |
| | | if (SyncSendAndRecv(&BHTopicCenterAddress(), &msg, &result, timeout_ms)) { |
| | | if (result.type() == kMsgTypeQueryTopicReply) { |
| | | MsgQueryTopicReply reply; |
| | | if (reply.ParseFromString(result.body())) { |
| | |
| | | #include "center.h" |
| | | #include "defs.h" |
| | | #include "pubsub.h" |
| | | #include "pubsub_center.h" |
| | |
| | | printf("flag = %d\n", *flag); |
| | | ++*flag; |
| | | |
| | | ReqRepCenter center(shm); |
| | | center.Start(2); |
| | | BHCenter center(shm); |
| | | center.Start(); |
| | | std::atomic<bool> run(true); |
| | | |
| | | auto Client = [&](const std::string &topic, const int nreq) { |