lichao
2021-03-31 2e99e5311d1b9a53cca17008452cbe49e2af7234
src/pubsub.cpp
@@ -18,7 +18,6 @@
#include "pubsub.h"
#include "bh_util.h"
#include "defs.h"
#include <chrono>
namespace bhome_shm
{
@@ -28,64 +27,13 @@
using namespace bhome_msg;
BusManager::BusManager(SharedMemory &shm) :
    shm_(shm),
    busq_(kBHBusQueueId, shm, 16),
    run_(false)
{
}
BusManager::~BusManager()
{
   Stop();
}
    shm_(shm), socket_(ShmSocket::eSockBus, shm) {}
BusManager::BusManager() :
    BusManager(BHomeShm()) {}
bool BusManager::Start(const int nworker)
{
   std::lock_guard<std::mutex> guard(mutex_);
   StopNoLock();
   // start
   auto Worker = [&]() {
      while (this->run_) {
         BusManager &self = *this;
         MsgI msg;
         const int timeout_ms = 100;
         if (self.busq_.Recv(msg, timeout_ms)) {
            self.OnMsg(msg);
         }
      }
   };
   run_.store(true);
   const int n = std::min(nworker, kMaxWorker);
   for (int i = 0; i < n; ++i) {
      workers_.emplace_back(Worker);
   }
   return true;
}
bool BusManager::Stop()
{
   std::lock_guard<std::mutex> guard(mutex_);
   return StopNoLock();
}
bool BusManager::StopNoLock()
{
   if (run_.exchange(false)) {
      for (auto &w : workers_) {
         if (w.joinable()) {
            w.join();
         }
      }
      return true;
   }
   return false;
}
void BusManager::OnMsg(MsgI &imsg)
{
   DEFER1(imsg.Release(shm_));
   auto onRecv = [&](MsgI &imsg) {
   BHMsg msg;
   if (!imsg.Unpack(msg)) {
      return;
@@ -164,13 +112,13 @@
      };
      if (imsg.IsCounted()) {
         Dispatch([&](const MQId &cli) { busq_.Send(cli, imsg, 100); });
            Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, imsg, 100); });
      } else {
         MsgI pubmsg;
         if (!pubmsg.MakeRC(shm_, msg)) { return; }
         DEFER1(pubmsg.Release(shm_));
         Dispatch([&](const MQId &cli) { busq_.Send(cli, pubmsg, 100); });
            Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, pubmsg, 100); });
      }
   };
@@ -180,6 +128,9 @@
   case kMsgTypePublish: OnPublish(); break;
   default: break;
   }
   };
   return socket_.StartRaw(onRecv, std::min(nworker, kMaxWorker));
}
} // namespace bhome_shm