| | |
| | | |
| | | } // namespace |
| | | |
| | | bool PubSubCenter::Start(const int nworker) |
| | | BHCenter::MsgHandler MakeBusCenter() |
| | | { |
| | | auto bus_ptr = std::make_shared<Synced<BusCenter>>(); |
| | | |
| | | auto onRecv = [bus_ptr, this](MsgI &imsg) { |
| | | return [bus_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) { |
| | | #ifndef NDEBUG |
| | | static std::atomic<time_t> last(0); |
| | | time_t now = 0; |
| | | time(&now); |
| | | if (last.exchange(now) < now) { |
| | | printf("bus queue size: %ld\n", socket_.Pending()); |
| | | printf("bus queue size: %ld\n", socket.Pending()); |
| | | } |
| | | #endif |
| | | auto &bus = *bus_ptr; |
| | | |
| | | BHMsg msg; |
| | | if (!imsg.Unpack(msg)) { |
| | | return; |
| | | } |
| | | auto &shm = socket.shm(); |
| | | |
| | | auto OnSubChange = [&](auto &&update) { |
| | | DataSub sub; |
| | |
| | | update(client, sub.topics()); |
| | | } |
| | | }; |
| | | |
| | | auto Sub = [&](const MQId &id, auto &topics) { bus->SubScribe(id, topics.begin(), topics.end()); }; |
| | | auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); }; |
| | | |
| | |
| | | }; |
| | | |
| | | if (imsg.IsCounted()) { |
| | | Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 10); }); |
| | | Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, imsg, 10); }); |
| | | } else { |
| | | MsgI pubmsg; |
| | | if (!pubmsg.MakeRC(shm(), msg)) { return; } |
| | | DEFER1(pubmsg.Release(shm())); |
| | | if (!pubmsg.MakeRC(shm, msg)) { return; } |
| | | DEFER1(pubmsg.Release(shm)); |
| | | |
| | | Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 10); }); |
| | | Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, pubmsg, 10); }); |
| | | } |
| | | }; |
| | | |
| | | switch (msg.type()) { |
| | | case kMsgTypeSubscribe: OnSubChange(Sub); break; |
| | | case kMsgTypeUnsubscribe: OnSubChange(Unsub); break; |
| | | case kMsgTypePublish: OnPublish(); break; |
| | | default: break; |
| | | case kMsgTypeSubscribe: OnSubChange(Sub); return true; |
| | | case kMsgTypeUnsubscribe: OnSubChange(Unsub); return true; |
| | | case kMsgTypePublish: OnPublish(); return true; |
| | | default: return false; |
| | | } |
| | | }; |
| | | } |
| | | |
| | | bool PubSubCenter::Start(const int nworker) |
| | | { |
| | | auto handler = MakeBusCenter(); |
| | | printf("sizeof(pub/sub handler) = %ld\n", sizeof(handler)); |
| | | |
| | | const int kMaxWorker = 16; |
| | | return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); |
| | | return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); |
| | | } |