lichao
2021-04-02 0bc72d004b08b6cac005931787f43c68dace7685
refactor pub/sub center.
2个文件已修改
131 ■■■■ 已修改文件
src/pubsub_center.cpp 126 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub_center.h 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub_center.cpp
@@ -18,10 +18,70 @@
#include "pubsub_center.h"
#include "bh_util.h"
using namespace bhome_shm;
namespace
{
class BusCenter
{
    typedef std::set<MQId> Clients;
    std::unordered_map<Topic, Clients> records_;
    // todo cache data if send fail.
public:
    template <class Iter>
    void SubScribe(const MQId &client, Iter topic_begin, Iter topic_end)
    {
        for (auto it = topic_begin; it != topic_end; ++it) {
            records_[*it].insert(client);
        }
    }
    template <class Iter>
    void UnsubScribe(const MQId &client, Iter topic_begin, Iter topic_end)
    {
        for (auto it = topic_begin; it != topic_end; ++it) {
            auto pos = records_.find(*it);
            if (pos != records_.end()) {
                if (pos->second.erase(client) && pos->second.empty()) {
                    records_.erase(pos);
                }
            }
        }
    };
    Clients FindClients(const std::string &topic)
    {
        Clients dests;
        auto Find1 = [&](const std::string &t) {
            auto pos = records_.find(topic);
            if (pos != records_.end() && !pos->second.empty()) {
                auto &clients = pos->second;
                for (auto &cli : clients) {
                    dests.insert(cli);
                }
            }
        };
        Find1(topic);
        //TODO check and adjust topic on client side sub/pub.
        size_t pos = 0;
        while (true) {
            pos = topic.find(kTopicSep, pos);
            if (pos == topic.npos || ++pos == topic.size()) {
                // Find1(std::string()); // sub all.
                break;
            } else {
                Find1(topic.substr(0, pos));
            }
        }
        return dests;
    }
};
} // namespace
bool PubSubCenter::Start(const int nworker)
{
    auto onRecv = [&](MsgI &imsg) {
    auto bus_ptr = std::make_shared<Synced<BusCenter>>();
    auto onRecv = [bus_ptr, this](MsgI &imsg) {
#ifndef NDEBUG
        static std::atomic<time_t> last(0);
        time_t now = 0;
@@ -30,6 +90,7 @@
            printf("bus queue size: %ld\n", socket_.Pending());
        }
#endif
        auto &bus = *bus_ptr;
        BHMsg msg;
        if (!imsg.Unpack(msg)) {
@@ -42,86 +103,39 @@
                assert(sizeof(MQId) == msg.route(0).mq_id().size());
                MQId client;
                memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
                std::lock_guard<std::mutex> guard(mutex_);
                auto &topics = sub.topics();
                for (auto &topic : topics) {
                    try {
                        update(topic, client);
                    } catch (...) {
                        //TODO log error
                    }
                }
                update(client, sub.topics());
            }
        };
        auto Sub1 = [this](const std::string &topic, const MQId &id) {
            records_[topic].insert(id);
        };
        auto Unsub1 = [this](const std::string &topic, const MQId &id) {
            auto pos = records_.find(topic);
            if (pos != records_.end()) {
                if (pos->second.erase(id) && pos->second.empty()) {
                    records_.erase(pos);
                }
            }
        };
        auto Sub = [&](const MQId &id, auto &topics) { bus->SubScribe(id, topics.begin(), topics.end()); };
        auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); };
        auto OnPublish = [&]() {
            DataPub pub;
            if (!pub.ParseFromString(msg.body())) {
                return;
            }
            auto FindClients = [&](const std::string &topic) {
                Clients dests;
                std::lock_guard<std::mutex> guard(mutex_);
                auto Find1 = [&](const std::string &t) {
                    auto pos = records_.find(topic);
                    if (pos != records_.end() && !pos->second.empty()) {
                        auto &clients = pos->second;
                        for (auto &cli : clients) {
                            dests.insert(cli);
                        }
                    }
                };
                Find1(topic);
                //TODO check and adjust topic on client side sub/pub.
                size_t pos = 0;
                while (true) {
                    pos = topic.find(kTopicSep, pos);
                    if (pos == topic.npos || ++pos == topic.size()) {
                        // Find1(std::string()); // sub all.
                        break;
                    } else {
                        Find1(topic.substr(0, pos));
                    }
                }
                return dests;
            };
            auto Dispatch = [&](auto &&send1) {
                const Clients &clients(FindClients(pub.topic()));
                const auto &clients(bus->FindClients(pub.topic()));
                for (auto &cli : clients) {
                    send1(cli);
                }
            };
            if (imsg.IsCounted()) {
                Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 100); });
                Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 10); });
            } else {
                MsgI pubmsg;
                if (!pubmsg.MakeRC(shm(), msg)) { return; }
                DEFER1(pubmsg.Release(shm()));
                Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 100); });
                Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 10); });
            }
        };
        switch (msg.type()) {
        case kMsgTypeSubscribe: OnSubChange(Sub1); break;
        case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
        case kMsgTypeSubscribe: OnSubChange(Sub); break;
        case kMsgTypeUnsubscribe: OnSubChange(Unsub); break;
        case kMsgTypePublish: OnPublish(); break;
        default: break;
        }
src/pubsub_center.h
@@ -37,11 +37,6 @@
    SocketBus socket_;
    ShmSocket::Shm &shm() { return socket_.shm(); }
    std::mutex mutex_;
    typedef std::set<MQId> Clients;
    std::unordered_map<Topic, Clients> records_;
    bool Find1(const Topic &topic);
public:
    PubSubCenter(ShmSocket::Shm &shm) :
        socket_(shm) {}