| | |
| | | #include "pubsub_center.h" |
| | | #include "bh_util.h" |
| | | using namespace bhome_shm; |
| | | namespace |
| | | { |
| | | class BusCenter |
| | | { |
| | | typedef std::set<MQId> Clients; |
| | | std::unordered_map<Topic, Clients> records_; |
| | | // todo cache data if send fail. |
| | | |
| | | public: |
| | | template <class Iter> |
| | | void SubScribe(const MQId &client, Iter topic_begin, Iter topic_end) |
| | | { |
| | | for (auto it = topic_begin; it != topic_end; ++it) { |
| | | records_[*it].insert(client); |
| | | } |
| | | } |
| | | template <class Iter> |
| | | void UnsubScribe(const MQId &client, Iter topic_begin, Iter topic_end) |
| | | { |
| | | for (auto it = topic_begin; it != topic_end; ++it) { |
| | | auto pos = records_.find(*it); |
| | | if (pos != records_.end()) { |
| | | if (pos->second.erase(client) && pos->second.empty()) { |
| | | records_.erase(pos); |
| | | } |
| | | } |
| | | } |
| | | }; |
| | | Clients FindClients(const std::string &topic) |
| | | { |
| | | Clients dests; |
| | | auto Find1 = [&](const std::string &t) { |
| | | auto pos = records_.find(topic); |
| | | if (pos != records_.end() && !pos->second.empty()) { |
| | | auto &clients = pos->second; |
| | | for (auto &cli : clients) { |
| | | dests.insert(cli); |
| | | } |
| | | } |
| | | }; |
| | | Find1(topic); |
| | | |
| | | //TODO check and adjust topic on client side sub/pub. |
| | | size_t pos = 0; |
| | | while (true) { |
| | | pos = topic.find(kTopicSep, pos); |
| | | if (pos == topic.npos || ++pos == topic.size()) { |
| | | // Find1(std::string()); // sub all. |
| | | break; |
| | | } else { |
| | | Find1(topic.substr(0, pos)); |
| | | } |
| | | } |
| | | return dests; |
| | | } |
| | | }; |
| | | |
| | | } // namespace |
| | | |
| | | bool PubSubCenter::Start(const int nworker) |
| | | { |
| | | auto onRecv = [&](MsgI &imsg) { |
| | | auto bus_ptr = std::make_shared<Synced<BusCenter>>(); |
| | | |
| | | auto onRecv = [bus_ptr, this](MsgI &imsg) { |
| | | #ifndef NDEBUG |
| | | static std::atomic<time_t> last(0); |
| | | time_t now = 0; |
| | |
| | | printf("bus queue size: %ld\n", socket_.Pending()); |
| | | } |
| | | #endif |
| | | auto &bus = *bus_ptr; |
| | | |
| | | BHMsg msg; |
| | | if (!imsg.Unpack(msg)) { |
| | |
| | | assert(sizeof(MQId) == msg.route(0).mq_id().size()); |
| | | MQId client; |
| | | memcpy(&client, msg.route(0).mq_id().data(), sizeof(client)); |
| | | |
| | | std::lock_guard<std::mutex> guard(mutex_); |
| | | auto &topics = sub.topics(); |
| | | for (auto &topic : topics) { |
| | | try { |
| | | update(topic, client); |
| | | } catch (...) { |
| | | //TODO log error |
| | | } |
| | | } |
| | | update(client, sub.topics()); |
| | | } |
| | | }; |
| | | |
| | | auto Sub1 = [this](const std::string &topic, const MQId &id) { |
| | | records_[topic].insert(id); |
| | | }; |
| | | |
| | | auto Unsub1 = [this](const std::string &topic, const MQId &id) { |
| | | auto pos = records_.find(topic); |
| | | if (pos != records_.end()) { |
| | | if (pos->second.erase(id) && pos->second.empty()) { |
| | | records_.erase(pos); |
| | | } |
| | | } |
| | | }; |
| | | auto Sub = [&](const MQId &id, auto &topics) { bus->SubScribe(id, topics.begin(), topics.end()); }; |
| | | auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); }; |
| | | |
| | | auto OnPublish = [&]() { |
| | | DataPub pub; |
| | | if (!pub.ParseFromString(msg.body())) { |
| | | return; |
| | | } |
| | | auto FindClients = [&](const std::string &topic) { |
| | | Clients dests; |
| | | std::lock_guard<std::mutex> guard(mutex_); |
| | | auto Find1 = [&](const std::string &t) { |
| | | auto pos = records_.find(topic); |
| | | if (pos != records_.end() && !pos->second.empty()) { |
| | | auto &clients = pos->second; |
| | | for (auto &cli : clients) { |
| | | dests.insert(cli); |
| | | } |
| | | } |
| | | }; |
| | | Find1(topic); |
| | | |
| | | //TODO check and adjust topic on client side sub/pub. |
| | | size_t pos = 0; |
| | | while (true) { |
| | | pos = topic.find(kTopicSep, pos); |
| | | if (pos == topic.npos || ++pos == topic.size()) { |
| | | // Find1(std::string()); // sub all. |
| | | break; |
| | | } else { |
| | | Find1(topic.substr(0, pos)); |
| | | } |
| | | } |
| | | return dests; |
| | | }; |
| | | |
| | | auto Dispatch = [&](auto &&send1) { |
| | | const Clients &clients(FindClients(pub.topic())); |
| | | const auto &clients(bus->FindClients(pub.topic())); |
| | | for (auto &cli : clients) { |
| | | send1(cli); |
| | | } |
| | | }; |
| | | |
| | | if (imsg.IsCounted()) { |
| | | Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 100); }); |
| | | Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 10); }); |
| | | } else { |
| | | MsgI pubmsg; |
| | | if (!pubmsg.MakeRC(shm(), msg)) { return; } |
| | | DEFER1(pubmsg.Release(shm())); |
| | | |
| | | Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 100); }); |
| | | Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 10); }); |
| | | } |
| | | }; |
| | | |
| | | switch (msg.type()) { |
| | | case kMsgTypeSubscribe: OnSubChange(Sub1); break; |
| | | case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break; |
| | | case kMsgTypeSubscribe: OnSubChange(Sub); break; |
| | | case kMsgTypeUnsubscribe: OnSubChange(Unsub); break; |
| | | case kMsgTypePublish: OnPublish(); break; |
| | | default: break; |
| | | } |