lichao
2021-04-06 4deeafbd502dc3c57dab8ad6ca601a38a9e7f074
src/pubsub_center.cpp
@@ -77,28 +77,24 @@
} // namespace
bool PubSubCenter::Start(const int nworker)
BHCenter::MsgHandler MakeBusCenter()
{
   auto bus_ptr = std::make_shared<Synced<BusCenter>>();
   auto onRecv = [bus_ptr, this](MsgI &imsg) {
   return [bus_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) {
#ifndef NDEBUG
      static std::atomic<time_t> last(0);
      time_t now = 0;
      time(&now);
      if (last.exchange(now) < now) {
         printf("bus queue size: %ld\n", socket_.Pending());
         printf("bus queue size: %ld\n", socket.Pending());
      }
#endif
      auto &bus = *bus_ptr;
      BHMsg msg;
      if (!imsg.Unpack(msg)) {
         return;
      }
      auto &shm = socket.shm();
      auto OnSubChange = [&](auto &&update) {
         DataSub sub;
         MsgSub sub;
         if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
            assert(sizeof(MQId) == msg.route(0).mq_id().size());
            MQId client;
@@ -106,12 +102,11 @@
            update(client, sub.topics());
         }
      };
      auto Sub = [&](const MQId &id, auto &topics) { bus->SubScribe(id, topics.begin(), topics.end()); };
      auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); };
      auto OnPublish = [&]() {
         DataPub pub;
         MsgPub pub;
         if (!pub.ParseFromString(msg.body())) {
            return;
         }
@@ -123,24 +118,30 @@
         };
         if (imsg.IsCounted()) {
            Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 10); });
            Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, imsg, 10); });
         } else {
            MsgI pubmsg;
            if (!pubmsg.MakeRC(shm(), msg)) { return; }
            DEFER1(pubmsg.Release(shm()));
            if (!pubmsg.MakeRC(shm, msg)) { return; }
            DEFER1(pubmsg.Release(shm));
            Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 10); });
            Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, pubmsg, 10); });
         }
      };
      switch (msg.type()) {
      case kMsgTypeSubscribe: OnSubChange(Sub); break;
      case kMsgTypeUnsubscribe: OnSubChange(Unsub); break;
      case kMsgTypePublish: OnPublish(); break;
      default: break;
      case kMsgTypeSubscribe: OnSubChange(Sub); return true;
      case kMsgTypeUnsubscribe: OnSubChange(Unsub); return true;
      case kMsgTypePublish: OnPublish(); return true;
      default: return false;
      }
   };
}
bool PubSubCenter::Start(const int nworker)
{
   auto handler = MakeBusCenter();
   printf("sizeof(pub/sub handler) = %ld\n", sizeof(handler));
   const int kMaxWorker = 16;
   return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
   return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
}