lichao
2021-04-02 0bc72d004b08b6cac005931787f43c68dace7685
refactor pub/sub center.
2个文件已修改
125 ■■■■ 已修改文件
src/pubsub_center.cpp 120 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub_center.h 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub_center.cpp
@@ -18,64 +18,37 @@
#include "pubsub_center.h"
#include "bh_util.h"
using namespace bhome_shm;
bool PubSubCenter::Start(const int nworker)
namespace
{
    auto onRecv = [&](MsgI &imsg) {
#ifndef NDEBUG
        static std::atomic<time_t> last(0);
        time_t now = 0;
        time(&now);
        if (last.exchange(now) < now) {
            printf("bus queue size: %ld\n", socket_.Pending());
        }
#endif
class BusCenter
{
    typedef std::set<MQId> Clients;
    std::unordered_map<Topic, Clients> records_;
    // todo cache data if send fail.
        BHMsg msg;
        if (!imsg.Unpack(msg)) {
            return;
        }
        auto OnSubChange = [&](auto &&update) {
            DataSub sub;
            if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
                assert(sizeof(MQId) == msg.route(0).mq_id().size());
                MQId client;
                memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
                std::lock_guard<std::mutex> guard(mutex_);
                auto &topics = sub.topics();
                for (auto &topic : topics) {
                    try {
                        update(topic, client);
                    } catch (...) {
                        //TODO log error
public:
    template <class Iter>
    void SubScribe(const MQId &client, Iter topic_begin, Iter topic_end)
    {
        for (auto it = topic_begin; it != topic_end; ++it) {
            records_[*it].insert(client);
                    }
                }
            }
        };
        auto Sub1 = [this](const std::string &topic, const MQId &id) {
            records_[topic].insert(id);
        };
        auto Unsub1 = [this](const std::string &topic, const MQId &id) {
            auto pos = records_.find(topic);
    template <class Iter>
    void UnsubScribe(const MQId &client, Iter topic_begin, Iter topic_end)
    {
        for (auto it = topic_begin; it != topic_end; ++it) {
            auto pos = records_.find(*it);
            if (pos != records_.end()) {
                if (pos->second.erase(id) && pos->second.empty()) {
                if (pos->second.erase(client) && pos->second.empty()) {
                    records_.erase(pos);
                }
            }
        };
        auto OnPublish = [&]() {
            DataPub pub;
            if (!pub.ParseFromString(msg.body())) {
                return;
            }
            auto FindClients = [&](const std::string &topic) {
    };
    Clients FindClients(const std::string &topic)
    {
                Clients dests;
                std::lock_guard<std::mutex> guard(mutex_);
                auto Find1 = [&](const std::string &t) {
                    auto pos = records_.find(topic);
                    if (pos != records_.end() && !pos->second.empty()) {
@@ -99,29 +72,70 @@
                    }
                }
                return dests;
    }
            };
} // namespace
bool PubSubCenter::Start(const int nworker)
{
    auto bus_ptr = std::make_shared<Synced<BusCenter>>();
    auto onRecv = [bus_ptr, this](MsgI &imsg) {
#ifndef NDEBUG
        static std::atomic<time_t> last(0);
        time_t now = 0;
        time(&now);
        if (last.exchange(now) < now) {
            printf("bus queue size: %ld\n", socket_.Pending());
        }
#endif
        auto &bus = *bus_ptr;
        BHMsg msg;
        if (!imsg.Unpack(msg)) {
            return;
        }
        auto OnSubChange = [&](auto &&update) {
            DataSub sub;
            if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
                assert(sizeof(MQId) == msg.route(0).mq_id().size());
                MQId client;
                memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
                update(client, sub.topics());
            }
        };
        auto Sub = [&](const MQId &id, auto &topics) { bus->SubScribe(id, topics.begin(), topics.end()); };
        auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); };
        auto OnPublish = [&]() {
            DataPub pub;
            if (!pub.ParseFromString(msg.body())) {
                return;
            }
            auto Dispatch = [&](auto &&send1) {
                const Clients &clients(FindClients(pub.topic()));
                const auto &clients(bus->FindClients(pub.topic()));
                for (auto &cli : clients) {
                    send1(cli);
                }
            };
            if (imsg.IsCounted()) {
                Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 100); });
                Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 10); });
            } else {
                MsgI pubmsg;
                if (!pubmsg.MakeRC(shm(), msg)) { return; }
                DEFER1(pubmsg.Release(shm()));
                Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 100); });
                Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 10); });
            }
        };
        switch (msg.type()) {
        case kMsgTypeSubscribe: OnSubChange(Sub1); break;
        case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
        case kMsgTypeSubscribe: OnSubChange(Sub); break;
        case kMsgTypeUnsubscribe: OnSubChange(Unsub); break;
        case kMsgTypePublish: OnPublish(); break;
        default: break;
        }
src/pubsub_center.h
@@ -37,11 +37,6 @@
    SocketBus socket_;
    ShmSocket::Shm &shm() { return socket_.shm(); }
    std::mutex mutex_;
    typedef std::set<MQId> Clients;
    std::unordered_map<Topic, Clients> records_;
    bool Find1(const Topic &topic);
public:
    PubSubCenter(ShmSocket::Shm &shm) :
        socket_(shm) {}