lichao
2021-04-02 83085f2ce99cca05d40a19482151873a55e6393a
src/pubsub_center.cpp
@@ -77,25 +77,21 @@
} // namespace
bool PubSubCenter::Start(const int nworker)
BHCenter::MsgHandler MakeBusCenter()
{
   auto bus_ptr = std::make_shared<Synced<BusCenter>>();
   auto onRecv = [bus_ptr, this](MsgI &imsg) {
   return [bus_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) {
#ifndef NDEBUG
      static std::atomic<time_t> last(0);
      time_t now = 0;
      time(&now);
      if (last.exchange(now) < now) {
         printf("bus queue size: %ld\n", socket_.Pending());
         printf("bus queue size: %ld\n", socket.Pending());
      }
#endif
      auto &bus = *bus_ptr;
      BHMsg msg;
      if (!imsg.Unpack(msg)) {
         return;
      }
      auto &shm = socket.shm();
      auto OnSubChange = [&](auto &&update) {
         DataSub sub;
@@ -106,7 +102,6 @@
            update(client, sub.topics());
         }
      };
      auto Sub = [&](const MQId &id, auto &topics) { bus->SubScribe(id, topics.begin(), topics.end()); };
      auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); };
@@ -123,24 +118,30 @@
         };
         if (imsg.IsCounted()) {
            Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 10); });
            Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, imsg, 10); });
         } else {
            MsgI pubmsg;
            if (!pubmsg.MakeRC(shm(), msg)) { return; }
            DEFER1(pubmsg.Release(shm()));
            if (!pubmsg.MakeRC(shm, msg)) { return; }
            DEFER1(pubmsg.Release(shm));
            Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 10); });
            Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, pubmsg, 10); });
         }
      };
      switch (msg.type()) {
      case kMsgTypeSubscribe: OnSubChange(Sub); break;
      case kMsgTypeUnsubscribe: OnSubChange(Unsub); break;
      case kMsgTypePublish: OnPublish(); break;
      default: break;
      case kMsgTypeSubscribe: OnSubChange(Sub); return true;
      case kMsgTypeUnsubscribe: OnSubChange(Unsub); return true;
      case kMsgTypePublish: OnPublish(); return true;
      default: return false;
      }
   };
}
bool PubSubCenter::Start(const int nworker)
{
   auto handler = MakeBusCenter();
   printf("sizeof(pub/sub handler) = %ld\n", sizeof(handler));
   const int kMaxWorker = 16;
   return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
   return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
}