| | |
| | | #include "pubsub.h" |
| | | #include "bh_util.h" |
| | | #include "defs.h" |
| | | #include <chrono> |
| | | |
| | | namespace bhome_shm |
| | | { |
| | |
| | | using namespace bhome_msg; |
| | | |
| | | BusManager::BusManager(SharedMemory &shm) : |
| | | shm_(shm), |
| | | busq_(kBHBusQueueId, shm, 16), |
| | | run_(false) |
| | | { |
| | | } |
| | | |
| | | BusManager::~BusManager() |
| | | { |
| | | Stop(); |
| | | } |
| | | shm_(shm), socket_(ShmSocket::eSockBus, shm) {} |
| | | BusManager::BusManager() : |
| | | BusManager(BHomeShm()) {} |
| | | |
| | | bool BusManager::Start(const int nworker) |
| | | { |
| | | std::lock_guard<std::mutex> guard(mutex_); |
| | | StopNoLock(); |
| | | // start |
| | | auto Worker = [&]() { |
| | | while (this->run_) { |
| | | BusManager &self = *this; |
| | | MsgI msg; |
| | | const int timeout_ms = 100; |
| | | if (self.busq_.Recv(msg, timeout_ms)) { |
| | | self.OnMsg(msg); |
| | | } |
| | | } |
| | | }; |
| | | |
| | | run_.store(true); |
| | | const int n = std::min(nworker, kMaxWorker); |
| | | for (int i = 0; i < n; ++i) { |
| | | workers_.emplace_back(Worker); |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | bool BusManager::Stop() |
| | | { |
| | | std::lock_guard<std::mutex> guard(mutex_); |
| | | return StopNoLock(); |
| | | } |
| | | |
| | | bool BusManager::StopNoLock() |
| | | { |
| | | if (run_.exchange(false)) { |
| | | for (auto &w : workers_) { |
| | | if (w.joinable()) { |
| | | w.join(); |
| | | } |
| | | } |
| | | return true; |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | void BusManager::OnMsg(MsgI &imsg) |
| | | { |
| | | DEFER1(imsg.Release(shm_)); |
| | | |
| | | auto onRecv = [&](MsgI &imsg) { |
| | | BHMsg msg; |
| | | if (!imsg.Unpack(msg)) { |
| | | return; |
| | |
| | | }; |
| | | |
| | | if (imsg.IsCounted()) { |
| | | Dispatch([&](const MQId &cli) { busq_.Send(cli, imsg, 100); }); |
| | | Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, imsg, 100); }); |
| | | } else { |
| | | MsgI pubmsg; |
| | | if (!pubmsg.MakeRC(shm_, msg)) { return; } |
| | | DEFER1(pubmsg.Release(shm_)); |
| | | |
| | | Dispatch([&](const MQId &cli) { busq_.Send(cli, pubmsg, 100); }); |
| | | Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, pubmsg, 100); }); |
| | | } |
| | | }; |
| | | |
| | |
| | | case kMsgTypePublish: OnPublish(); break; |
| | | default: break; |
| | | } |
| | | }; |
| | | |
| | | return socket_.StartRaw(onRecv, std::min(nworker, kMaxWorker)); |
| | | } |
| | | |
| | | } // namespace bhome_shm |