From 83085f2ce99cca05d40a19482151873a55e6393a Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 02 四月 2021 19:32:21 +0800 Subject: [PATCH] refactor center; add async request no cb. --- src/pubsub_center.cpp | 37 +++++++++++++++++++------------------ 1 files changed, 19 insertions(+), 18 deletions(-) diff --git a/src/pubsub_center.cpp b/src/pubsub_center.cpp index 3ba5382..b3af47d 100644 --- a/src/pubsub_center.cpp +++ b/src/pubsub_center.cpp @@ -77,25 +77,21 @@ } // namespace -bool PubSubCenter::Start(const int nworker) +BHCenter::MsgHandler MakeBusCenter() { auto bus_ptr = std::make_shared<Synced<BusCenter>>(); - auto onRecv = [bus_ptr, this](MsgI &imsg) { + return [bus_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) { #ifndef NDEBUG static std::atomic<time_t> last(0); time_t now = 0; time(&now); if (last.exchange(now) < now) { - printf("bus queue size: %ld\n", socket_.Pending()); + printf("bus queue size: %ld\n", socket.Pending()); } #endif auto &bus = *bus_ptr; - - BHMsg msg; - if (!imsg.Unpack(msg)) { - return; - } + auto &shm = socket.shm(); auto OnSubChange = [&](auto &&update) { DataSub sub; @@ -106,7 +102,6 @@ update(client, sub.topics()); } }; - auto Sub = [&](const MQId &id, auto &topics) { bus->SubScribe(id, topics.begin(), topics.end()); }; auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); }; @@ -123,24 +118,30 @@ }; if (imsg.IsCounted()) { - Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 10); }); + Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, imsg, 10); }); } else { MsgI pubmsg; - if (!pubmsg.MakeRC(shm(), msg)) { return; } - DEFER1(pubmsg.Release(shm())); + if (!pubmsg.MakeRC(shm, msg)) { return; } + DEFER1(pubmsg.Release(shm)); - Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 10); }); + Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, pubmsg, 10); }); } }; switch (msg.type()) { - case kMsgTypeSubscribe: OnSubChange(Sub); break; - case kMsgTypeUnsubscribe: OnSubChange(Unsub); break; - case kMsgTypePublish: OnPublish(); break; - default: break; + case kMsgTypeSubscribe: OnSubChange(Sub); return true; + case kMsgTypeUnsubscribe: OnSubChange(Unsub); return true; + case kMsgTypePublish: OnPublish(); return true; + default: return false; } }; +} + +bool PubSubCenter::Start(const int nworker) +{ + auto handler = MakeBusCenter(); + printf("sizeof(pub/sub handler) = %ld\n", sizeof(handler)); const int kMaxWorker = 16; - return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); + return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); } \ No newline at end of file -- Gitblit v1.8.0