From 4deeafbd502dc3c57dab8ad6ca601a38a9e7f074 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 06 四月 2021 19:10:49 +0800 Subject: [PATCH] add uni center. --- src/pubsub_center.cpp | 159 +++++++++++++++++++++++++++++------------------------ 1 files changed, 87 insertions(+), 72 deletions(-) diff --git a/src/pubsub_center.cpp b/src/pubsub_center.cpp index afd07bf..698327e 100644 --- a/src/pubsub_center.cpp +++ b/src/pubsub_center.cpp @@ -18,115 +18,130 @@ #include "pubsub_center.h" #include "bh_util.h" using namespace bhome_shm; - -bool PubSubCenter::Start(const int nworker) +namespace { - auto onRecv = [&](MsgI &imsg) { +class BusCenter +{ + typedef std::set<MQId> Clients; + std::unordered_map<Topic, Clients> records_; + // todo cache data if send fail. + +public: + template <class Iter> + void SubScribe(const MQId &client, Iter topic_begin, Iter topic_end) + { + for (auto it = topic_begin; it != topic_end; ++it) { + records_[*it].insert(client); + } + } + template <class Iter> + void UnsubScribe(const MQId &client, Iter topic_begin, Iter topic_end) + { + for (auto it = topic_begin; it != topic_end; ++it) { + auto pos = records_.find(*it); + if (pos != records_.end()) { + if (pos->second.erase(client) && pos->second.empty()) { + records_.erase(pos); + } + } + } + }; + Clients FindClients(const std::string &topic) + { + Clients dests; + auto Find1 = [&](const std::string &t) { + auto pos = records_.find(topic); + if (pos != records_.end() && !pos->second.empty()) { + auto &clients = pos->second; + for (auto &cli : clients) { + dests.insert(cli); + } + } + }; + Find1(topic); + + //TODO check and adjust topic on client side sub/pub. + size_t pos = 0; + while (true) { + pos = topic.find(kTopicSep, pos); + if (pos == topic.npos || ++pos == topic.size()) { + // Find1(std::string()); // sub all. + break; + } else { + Find1(topic.substr(0, pos)); + } + } + return dests; + } +}; + +} // namespace + +BHCenter::MsgHandler MakeBusCenter() +{ + auto bus_ptr = std::make_shared<Synced<BusCenter>>(); + + return [bus_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) { #ifndef NDEBUG static std::atomic<time_t> last(0); time_t now = 0; time(&now); if (last.exchange(now) < now) { - printf("bus queue size: %ld\n", socket_.Pending()); + printf("bus queue size: %ld\n", socket.Pending()); } #endif - - BHMsg msg; - if (!imsg.Unpack(msg)) { - return; - } + auto &bus = *bus_ptr; + auto &shm = socket.shm(); auto OnSubChange = [&](auto &&update) { - DataSub sub; + MsgSub sub; if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) { assert(sizeof(MQId) == msg.route(0).mq_id().size()); MQId client; memcpy(&client, msg.route(0).mq_id().data(), sizeof(client)); - - std::lock_guard<std::mutex> guard(mutex_); - auto &topics = sub.topics(); - for (auto &topic : topics) { - try { - update(topic, client); - } catch (...) { - //TODO log error - } - } + update(client, sub.topics()); } }; - - auto Sub1 = [this](const std::string &topic, const MQId &id) { - records_[topic].insert(id); - }; - - auto Unsub1 = [this](const std::string &topic, const MQId &id) { - auto pos = records_.find(topic); - if (pos != records_.end()) { - if (pos->second.erase(id) && pos->second.empty()) { - records_.erase(pos); - } - } - }; + auto Sub = [&](const MQId &id, auto &topics) { bus->SubScribe(id, topics.begin(), topics.end()); }; + auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); }; auto OnPublish = [&]() { - DataPub pub; + MsgPub pub; if (!pub.ParseFromString(msg.body())) { return; } - auto FindClients = [&](const std::string &topic) { - Clients dests; - std::lock_guard<std::mutex> guard(mutex_); - auto Find1 = [&](const std::string &t) { - auto pos = records_.find(topic); - if (pos != records_.end() && !pos->second.empty()) { - auto &clients = pos->second; - for (auto &cli : clients) { - dests.insert(cli); - } - } - }; - Find1(topic); - - //TODO check and adjust topic on client side sub/pub. - size_t pos = 0; - while (true) { - pos = topic.find(kTopicSep, pos); - if (pos == topic.npos || ++pos == topic.size()) { - // Find1(std::string()); // sub all. - break; - } else { - Find1(topic.substr(0, pos)); - } - } - return dests; - }; - auto Dispatch = [&](auto &&send1) { - const Clients &clients(FindClients(pub.topic())); + const auto &clients(bus->FindClients(pub.topic())); for (auto &cli : clients) { send1(cli); } }; if (imsg.IsCounted()) { - Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 100); }); + Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, imsg, 10); }); } else { MsgI pubmsg; - if (!pubmsg.MakeRC(shm(), msg)) { return; } - DEFER1(pubmsg.Release(shm())); + if (!pubmsg.MakeRC(shm, msg)) { return; } + DEFER1(pubmsg.Release(shm)); - Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 100); }); + Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, pubmsg, 10); }); } }; switch (msg.type()) { - case kMsgTypeSubscribe: OnSubChange(Sub1); break; - case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break; - case kMsgTypePublish: OnPublish(); break; - default: break; + case kMsgTypeSubscribe: OnSubChange(Sub); return true; + case kMsgTypeUnsubscribe: OnSubChange(Unsub); return true; + case kMsgTypePublish: OnPublish(); return true; + default: return false; } }; +} + +bool PubSubCenter::Start(const int nworker) +{ + auto handler = MakeBusCenter(); + printf("sizeof(pub/sub handler) = %ld\n", sizeof(handler)); const int kMaxWorker = 16; - return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); + return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); } \ No newline at end of file -- Gitblit v1.8.0