From 0bc72d004b08b6cac005931787f43c68dace7685 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 02 四月 2021 16:25:39 +0800 Subject: [PATCH] refactor pub/sub center. --- src/pubsub_center.cpp | 126 +++++++++++++++++++++++------------------ 1 files changed, 70 insertions(+), 56 deletions(-) diff --git a/src/pubsub_center.cpp b/src/pubsub_center.cpp index afd07bf..3ba5382 100644 --- a/src/pubsub_center.cpp +++ b/src/pubsub_center.cpp @@ -18,10 +18,70 @@ #include "pubsub_center.h" #include "bh_util.h" using namespace bhome_shm; +namespace +{ +class BusCenter +{ + typedef std::set<MQId> Clients; + std::unordered_map<Topic, Clients> records_; + // todo cache data if send fail. + +public: + template <class Iter> + void SubScribe(const MQId &client, Iter topic_begin, Iter topic_end) + { + for (auto it = topic_begin; it != topic_end; ++it) { + records_[*it].insert(client); + } + } + template <class Iter> + void UnsubScribe(const MQId &client, Iter topic_begin, Iter topic_end) + { + for (auto it = topic_begin; it != topic_end; ++it) { + auto pos = records_.find(*it); + if (pos != records_.end()) { + if (pos->second.erase(client) && pos->second.empty()) { + records_.erase(pos); + } + } + } + }; + Clients FindClients(const std::string &topic) + { + Clients dests; + auto Find1 = [&](const std::string &t) { + auto pos = records_.find(topic); + if (pos != records_.end() && !pos->second.empty()) { + auto &clients = pos->second; + for (auto &cli : clients) { + dests.insert(cli); + } + } + }; + Find1(topic); + + //TODO check and adjust topic on client side sub/pub. + size_t pos = 0; + while (true) { + pos = topic.find(kTopicSep, pos); + if (pos == topic.npos || ++pos == topic.size()) { + // Find1(std::string()); // sub all. + break; + } else { + Find1(topic.substr(0, pos)); + } + } + return dests; + } +}; + +} // namespace bool PubSubCenter::Start(const int nworker) { - auto onRecv = [&](MsgI &imsg) { + auto bus_ptr = std::make_shared<Synced<BusCenter>>(); + + auto onRecv = [bus_ptr, this](MsgI &imsg) { #ifndef NDEBUG static std::atomic<time_t> last(0); time_t now = 0; @@ -30,6 +90,7 @@ printf("bus queue size: %ld\n", socket_.Pending()); } #endif + auto &bus = *bus_ptr; BHMsg msg; if (!imsg.Unpack(msg)) { @@ -42,86 +103,39 @@ assert(sizeof(MQId) == msg.route(0).mq_id().size()); MQId client; memcpy(&client, msg.route(0).mq_id().data(), sizeof(client)); - - std::lock_guard<std::mutex> guard(mutex_); - auto &topics = sub.topics(); - for (auto &topic : topics) { - try { - update(topic, client); - } catch (...) { - //TODO log error - } - } + update(client, sub.topics()); } }; - auto Sub1 = [this](const std::string &topic, const MQId &id) { - records_[topic].insert(id); - }; - - auto Unsub1 = [this](const std::string &topic, const MQId &id) { - auto pos = records_.find(topic); - if (pos != records_.end()) { - if (pos->second.erase(id) && pos->second.empty()) { - records_.erase(pos); - } - } - }; + auto Sub = [&](const MQId &id, auto &topics) { bus->SubScribe(id, topics.begin(), topics.end()); }; + auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); }; auto OnPublish = [&]() { DataPub pub; if (!pub.ParseFromString(msg.body())) { return; } - auto FindClients = [&](const std::string &topic) { - Clients dests; - std::lock_guard<std::mutex> guard(mutex_); - auto Find1 = [&](const std::string &t) { - auto pos = records_.find(topic); - if (pos != records_.end() && !pos->second.empty()) { - auto &clients = pos->second; - for (auto &cli : clients) { - dests.insert(cli); - } - } - }; - Find1(topic); - - //TODO check and adjust topic on client side sub/pub. - size_t pos = 0; - while (true) { - pos = topic.find(kTopicSep, pos); - if (pos == topic.npos || ++pos == topic.size()) { - // Find1(std::string()); // sub all. - break; - } else { - Find1(topic.substr(0, pos)); - } - } - return dests; - }; - auto Dispatch = [&](auto &&send1) { - const Clients &clients(FindClients(pub.topic())); + const auto &clients(bus->FindClients(pub.topic())); for (auto &cli : clients) { send1(cli); } }; if (imsg.IsCounted()) { - Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 100); }); + Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 10); }); } else { MsgI pubmsg; if (!pubmsg.MakeRC(shm(), msg)) { return; } DEFER1(pubmsg.Release(shm())); - Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 100); }); + Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 10); }); } }; switch (msg.type()) { - case kMsgTypeSubscribe: OnSubChange(Sub1); break; - case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break; + case kMsgTypeSubscribe: OnSubChange(Sub); break; + case kMsgTypeUnsubscribe: OnSubChange(Unsub); break; case kMsgTypePublish: OnPublish(); break; default: break; } -- Gitblit v1.8.0