lichao
2021-06-23 c1e39e20ca42b21eeac8b5068fa1f921bf9a070f
refactor, start tcp pub/sub.
11个文件已修改
286 ■■■■ 已修改文件
box/center.cpp 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center_topic_node.cpp 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/node_center.cpp 184 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/node_center.h 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_connection.cpp 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_proxy.cpp 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_proxy.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg.proto 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg_api.proto 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp
@@ -135,27 +135,18 @@
    auto OnBusIdle = [=](ShmSocket &socket) {};
    auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; };
    auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
    auto OnPubSub = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
        auto &center = *center_ptr;
        auto replyer = MakeReplyer(socket, head, center);
        auto OnPublish = [&]() {
            MsgPublish pub;
            NodeCenter::Clients clients;
            MsgCommonReply reply;
            if (head.route_size() != 1 || !msg.ParseBody(pub)) {
                return;
            } else if (!center->FindClients(head, pub, clients, reply)) {
            if (head.route_size() == 1 && msg.ParseBody(pub)) {
                // replyer(center->Publish(head, pub.topic(), msg)); // dead lock?
                auto reply(center->Publish(head, pub.topic(), msg));
                replyer(reply);
            } else {
                replyer(MakeReply(eSuccess));
                if (clients.empty()) { return; }
                for (auto &cli : clients) {
                    auto node = cli.weak_node_.lock();
                    if (node) {
                        // should also make sure that mq is not killed before msg expires.
                        // it would be ok if (kill_time - offline_time) is longer than expire time.
                        socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
                    }
                auto hosts = center->FindRemoteSubClients(pub.topic());
                for (auto &host : hosts) {
                    tcp_proxy.Publish(host, kBHCenterPort, pub.SerializeAsString());
                }
            }
        };
box/center_topic_node.cpp
@@ -43,11 +43,17 @@
        proc.put("name", info.proc().name());
        proc.put("publicInfo", info.proc().public_info());
        proc.put("online", info.online());
        Json topics = Json::Array();
        for (auto &t : info.topics().topic_list()) {
            topics.push_back(t);
        }
        proc.put("topics", topics);
        auto AddTopics = [&](auto &name, auto &topic_list) {
            Json topics = Json::Array();
            for (auto &t : topic_list) {
                topics.push_back(t);
            }
            proc.put(name, topics);
        };
        AddTopics("service", info.service().topic_list());
        AddTopics("local_sub", info.local_sub().topic_list());
        AddTopics("net_sub", info.net_sub().topic_list());
        list.push_back(proc);
    }
    return json.dump(0);
box/node_center.cpp
@@ -267,6 +267,43 @@
    return sender.Send(dest, msg, head.msg_id(), std::move(cb));
}
bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content)
{
    auto &topic = head.topic();
    auto clients = DoFindClients(topic, true);
    if (clients.empty()) { return true; }
    std::vector<MsgI> msgs;
    auto ReleaseAll = [&]() {for (auto &msg : msgs) { msg.Release(); } };
    DEFER1(ReleaseAll(););
    for (auto &cli : clients) {
        auto Send1 = [&](Node node) {
            auto &shm = node->shm_;
            for (auto &msg : msgs) {
                if (msg.shm().name() == shm.name()) {
                    DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
                    return;
                }
            }
            MsgI msg(shm);
            if (msg.Make(body_content)) {
                RecordMsg(msg);
                msgs.push_back(msg);
                DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
            }
        };
        auto node = cli.weak_node_.lock();
        if (node) {
            Send1(node);
            // should also make sure that mq is not killed before msg expires.
            // it would be ok if (kill_time - offline_time) is longer than expire time.
        }
    }
    return true;
}
bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content)
{
    Node node(GetNode(dest.id_));
@@ -469,11 +506,16 @@
        *info->mutable_proc() = node->proc_;
        info->mutable_proc()->clear_private_info();
        info->set_online(node->state_.flag_ == kStateNormal);
        for (auto &addr_topics : node->services_) {
            for (auto &topic : addr_topics.second) {
                info->mutable_topics()->add_topic_list(topic);
        auto AddTopics = [](auto &dst, auto &src) {
            for (auto &addr_topics : src) {
                for (auto &topic : addr_topics.second) {
                    dst.add_topic_list(topic);
                }
            }
        }
        };
        AddTopics(*info->mutable_service(), node->services_);
        AddTopics(*info->mutable_local_sub(), node->local_sub_);
        AddTopics(*info->mutable_net_sub(), node->net_sub_);
    };
    if (!proc_id.empty()) {
@@ -532,35 +574,50 @@
    return HandleMsg<Reply>(head, query);
}
void NodeCenter::NodeInfo::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node)
{
    auto src = SrcAddr(head);
    auto Sub = [&](auto &sub, auto &sub_map) {
        auto &topics = msg.topics().topic_list();
        sub[src].insert(topics.begin(), topics.end());
        const TopicDest &dest = {src, SrcAbsAddr(head), node};
        for (auto &topic : topics) {
            sub_map[topic].insert(dest);
        }
    };
    LOG_DEBUG() << "subscribe net : " << msg.network();
    if (msg.network()) {
        Sub(net_sub_, center_.net_sub_map_);
    } else {
        Sub(local_sub_, center_.local_sub_map_);
    }
}
MsgCommonReply NodeCenter::Subscribe(const BHMsgHead &head, const MsgSubscribe &msg)
{
    return HandleMsg(head, [&](Node node) {
        auto src = SrcAddr(head);
        auto &topics = msg.topics().topic_list();
        node->subscriptions_[src].insert(topics.begin(), topics.end());
        TopicDest dest = {src, SrcAbsAddr(head), node};
        for (auto &topic : topics) {
            subscribe_map_[topic].insert(dest);
        }
        node->Subscribe(head, msg, node);
        return MakeReply(eSuccess);
    });
}
MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg)
{
    return HandleMsg(head, [&](Node node) {
        auto src = SrcAddr(head);
        auto pos = node->subscriptions_.find(src);
        auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) {
            auto pos = subscribe_map_.find(topic);
            if (pos != subscribe_map_.end() &&
void NodeCenter::NodeInfo::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node)
{
    auto src = SrcAddr(head);
    auto Unsub = [&](auto &sub, auto &sub_map) {
        auto pos = sub.find(src);
        auto RemoveSubTopicDestRecord = [&sub_map](const Topic &topic, const TopicDest &dest) {
            auto pos = sub_map.find(topic);
            if (pos != sub_map.end() &&
                pos->second.erase(dest) != 0 &&
                pos->second.empty()) {
                subscribe_map_.erase(pos);
                sub_map.erase(pos);
            }
        };
        if (pos != node->subscriptions_.end()) {
        if (pos != sub.end()) {
            const TopicDest &dest = {src, SrcAbsAddr(head), node};
            auto &topics = msg.topics().topic_list();
            // clear node sub records;
@@ -569,26 +626,44 @@
                RemoveSubTopicDestRecord(topic, dest);
            }
            if (pos->second.empty()) {
                node->subscriptions_.erase(pos);
                sub.erase(pos);
            }
        }
    };
    if (msg.network()) {
        Unsub(net_sub_, center_.net_sub_map_);
    } else {
        Unsub(local_sub_, center_.local_sub_map_);
    }
}
MsgCommonReply NodeCenter::Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg)
{
    return HandleMsg(head, [&](Node node) {
        node->Unsubscribe(head, msg, node);
        return MakeReply(eSuccess);
    });
}
NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic)
NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote)
{
    Clients dests;
    auto Find1 = [&](const std::string &exact) {
        auto pos = subscribe_map_.find(exact);
        if (pos != subscribe_map_.end()) {
            auto &clients = pos->second;
            for (auto &cli : clients) {
                if (Valid(cli.weak_node_)) {
                    dests.insert(cli);
        auto FindIn = [&](auto &sub_map) {
            auto pos = sub_map.find(exact);
            if (pos != sub_map.end()) {
                auto &clients = pos->second;
                for (auto &cli : clients) {
                    if (Valid(cli.weak_node_)) {
                        dests.insert(cli);
                    }
                }
            }
        };
        if (!from_remote) {
            FindIn(local_sub_map_);
        }
        FindIn(net_sub_map_);
    };
    Find1(topic);
@@ -605,15 +680,31 @@
    return dests;
}
bool NodeCenter::FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply)
MsgCommonReply NodeCenter::Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg)
{
    bool ret = false;
    HandleMsg(head, [&](Node node) {
        DoFindClients(msg.topic()).swap(out);
        ret = true;
    return HandleMsg(head, [&](Node node) {
        DoPublish(DefaultSender(node->shm_), topic, msg);
        return MakeReply(eSuccess);
    }).Swap(&reply);
    return ret;
    });
}
void NodeCenter::DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg)
{
    try {
        auto clients = DoFindClients(topic, false);
        if (clients.empty()) { return; }
        for (auto &cli : clients) {
            auto node = cli.weak_node_.lock();
            if (node) {
                // should also make sure that mq is not killed before msg expires.
                // it would be ok if (kill_time - offline_time) is longer than expire time.
                sock.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
            }
        }
    } catch (...) {
        LOG_ERROR() << "DoPublish error.";
    }
}
void NodeCenter::OnTimer()
@@ -659,7 +750,8 @@
        }
    };
    EraseMapRec(service_map_, node->services_);
    EraseMapRec(subscribe_map_, node->subscriptions_);
    EraseMapRec(local_sub_map_, node->local_sub_);
    EraseMapRec(net_sub_map_, node->net_sub_);
    // remove online record.
    auto pos = online_node_addr_map_.find(node->proc_.proc_id());
@@ -681,10 +773,6 @@
void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content)
{
    try {
        // LOG_DEBUG() << "center publish: " << topic << ": " << content;
        Clients clients(DoFindClients(topic));
        if (clients.empty()) { return; }
        MsgPublish pub;
        pub.set_topic(topic);
        pub.set_data(content);
@@ -693,16 +781,16 @@
        if (msg.Make(head, pub)) {
            DEFER1(msg.Release());
            RecordMsg(msg);
            for (auto &cli : clients) {
                auto node = cli.weak_node_.lock();
                if (node && node->state_.flag_ == kStateNormal) {
                    DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
                }
            }
            DoPublish(DefaultSender(shm), topic, msg);
        }
    } catch (...) {
        LOG_ERROR() << "center publish error.";
    }
}
std::vector<std::string> NodeCenter::FindRemoteSubClients(const Topic &topic)
{
    //TODO search synced full list;
    return std::vector<std::string>();
}
box/node_center.h
@@ -82,6 +82,10 @@
    };
    typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
    struct NodeInfo;
    typedef std::shared_ptr<NodeInfo> Node;
    typedef std::weak_ptr<NodeInfo> WeakNode;
    struct NodeInfo {
        NodeCenter &center_;
        SharedMemory &shm_;
@@ -89,14 +93,15 @@
        std::map<MQId, int64_t> addrs_; // registered mqs
        ProcInfo proc_;                 //
        AddressTopics services_;        // address: topics
        AddressTopics subscriptions_;   // address: topics
        AddressTopics local_sub_;       // address: topics
        AddressTopics net_sub_;         // address: topics
        NodeInfo(NodeCenter &center, SharedMemory &shm) :
            center_(center), shm_(shm) {}
        void PutOffline(const int64_t offline_time);
        void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time);
        void Subscribe(const BHMsgHead &head, const MsgSubscribe &msg, Node node);
        void Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg, Node node);
    };
    typedef std::shared_ptr<NodeInfo> Node;
    typedef std::weak_ptr<NodeInfo> WeakNode;
    struct TopicDest {
        MQId mq_id_;
@@ -121,7 +126,9 @@
    void RecordMsg(const MsgI &msg);
    bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg);
    bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg);
    bool PassRemoteRequestToLocal(MQInfo dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb);
    bool RemotePublish(BHMsgHead &head, const std::string &body_content);
    bool PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content);
    void OnAlloc(ShmSocket &socket, const int64_t val);
    void OnFree(ShmSocket &socket, const int64_t val);
@@ -176,15 +183,19 @@
    MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req);
    MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg);
    MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg);
    Clients DoFindClients(const std::string &topic);
    bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply);
    MsgCommonReply Publish(const BHMsgHead &head, const Topic &topic, MsgI &msg);
    void OnTimer();
    // remote hosts records
    std::vector<std::string> FindRemoteSubClients(const Topic &topic);
private:
    void CheckNodes();
    bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; }
    void Publish(SharedMemory &shm, const Topic &topic, const std::string &content);
    void DoPublish(ShmSocket &sock, const Topic &topic, MsgI &msg);
    Clients DoFindClients(const std::string &topic, bool from_remote);
    bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; }
    bool Valid(const WeakNode &weak)
    {
@@ -197,7 +208,8 @@
    std::string id_; // center proc id;
    std::unordered_map<Topic, Clients> service_map_;
    std::unordered_map<Topic, Clients> subscribe_map_;
    std::unordered_map<Topic, Clients> local_sub_map_;
    std::unordered_map<Topic, Clients> net_sub_map_;
    std::unordered_map<Address, Node> nodes_;
    std::unordered_map<ProcId, Address> online_node_addr_map_;
    ProcRecords procs_; // To get a short index for msg alloc.
box/tcp_connection.cpp
@@ -178,8 +178,17 @@
            send_buffer_ = imsg.content();
            async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
        };
        auto &scenter = *pscenter_;
        if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) {
        if (head.type() == kMsgTypePublish) {
            auto reply = MakeReply(eSuccess);
            auto rep_head = InitMsgHead(GetType(reply), scenter->id(), 0, head.msg_id());
            send_buffer_ = MsgI::Serialize(rep_head, reply);
            async_write(socket_, Buffer(send_buffer_), TcpCBSize(*this, [this]() { Close(); }));
            scenter->RemotePublish(head, body_content);
            return;
        } else if (scenter->PassRemoteRequestToLocal(remote, head, body_content, onRecv)) {
            return;
        } else {
            Close();
box/tcp_proxy.cpp
@@ -31,3 +31,8 @@
        return false;
    }
}
bool TcpProxy::Publish(const std::string &ip, int port, std::string &&content)
{
    return Request(ip, port, std::move(content), ReplyCB());
}
box/tcp_proxy.h
@@ -33,6 +33,7 @@
    TcpProxy(io_service_t &io) :
        io_(io) {}
    bool Request(const std::string &ip, int port, std::string &&content, ReplyCB const &cb);
    bool Publish(const std::string &ip, int port, std::string &&content);
private:
    io_service_t &io_;
proto/source/bhome_msg.proto
@@ -57,9 +57,11 @@
message MsgSubscribe {
    MsgTopicList topics = 1;
    bool network = 2;
}
message MsgUnsubscribe {
    MsgTopicList topics = 1;
    bool network = 2;
}
message MsgRegisterRPC {
    MsgTopicList topics = 1;
proto/source/bhome_msg_api.proto
@@ -84,7 +84,9 @@
    message Info {
        ProcInfo proc  = 1;
        bool online = 2;
        MsgTopicList topics = 3;
        MsgTopicList service = 3;
        MsgTopicList local_sub = 4;
        MsgTopicList net_sub = 5;
    }
    repeated Info proc_list = 2;
}
src/topic_node.cpp
@@ -628,6 +628,7 @@
        auto &sock = SockPub();
        BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn()));
        AddRoute(head, sock);
        head.set_topic(pub.topic());
        if (timeout_ms == 0) {
            return sock.Send(BusAddr(), head, pub);
utest/api_test.cpp
@@ -256,14 +256,21 @@
            printf("proc [%d] %s, %s, %s\n\ttopics\n", i,
                   (info.online() ? "online" : "offline"),
                   info.proc().proc_id().c_str(), info.proc().name().c_str());
            for (auto &t : info.topics().topic_list()) {
                printf("\t\t %s\n", t.c_str());
            }
            auto PrintTopics = [](std::string const &name, auto &topic_list) {
                printf("%s:[", name.c_str());
                for (auto &t : topic_list) {
                    printf("%s,", t.c_str());
                }
                printf("]\n");
            };
            PrintTopics("service", info.service().topic_list());
            PrintTopics("local_sub", info.local_sub().topic_list());
            PrintTopics("net_sub", info.net_sub().topic_list());
            printf("\n");
        }
        printf("\n");
    };
    if (0) {
    if (1) {
        // query procs
        std::string dest(BHAddress().SerializeAsString());
        MsgQueryProc query;