lichao
2021-06-30 ae17d1439b35b55212c3a30712e0a60b1d6a99c0
support tcp pub/sub.
12个文件已修改
266 ■■■■ 已修改文件
api/bhsgo/bhome_node.go 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center_topic_node.cpp 29 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/node_center.cpp 82 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/node_center.h 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/tcp_connection.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cc 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.h 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/exported_symbols 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 91 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/bhsgo/bhome_node.go
@@ -47,6 +47,11 @@
    return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeTopics), data, reply, timeout_ms)
}
func SubscribeNet(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool {
    data, _ := topics.Marshal()
    return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeNetTopics), data, reply, timeout_ms)
}
func Heartbeat(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool {
    data, _ := proc.Marshal()
    return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHHeartbeat), data, reply, timeout_ms)
box/center.cpp
@@ -146,7 +146,7 @@
                replyer(reply);
                auto hosts = center->FindRemoteSubClients(pub.topic());
                for (auto &host : hosts) {
                    tcp_proxy.Publish(host, kBHCenterPort, pub.SerializeAsString());
                    tcp_proxy.Publish(host, kBHCenterPort, msg.content());
                }
            }
        };
box/center_topic_node.cpp
@@ -30,6 +30,7 @@
namespace
{
const std::string &kTopicQueryProc = "#center_query_procs";
const std::string &kTopicNotifyRemoteInfo = "pub-allRegisterInfo-to-center";
std::string ToJson(const MsgQueryProcReply &qpr)
{
@@ -92,10 +93,16 @@
        throw std::runtime_error("center node register failed.");
    }
    MsgTopicList topics;
    topics.add_topic_list(kTopicQueryProc);
    if (!pnode_->DoServerRegisterRPC(true, topics, reply, timeout)) {
    MsgTopicList services;
    services.add_topic_list(kTopicQueryProc);
    if (!pnode_->DoServerRegisterRPC(true, services, reply, timeout)) {
        throw std::runtime_error("center node register topics failed.");
    }
    MsgTopicList subs;
    subs.add_topic_list(kTopicNotifyRemoteInfo);
    if (!pnode_->Subscribe(subs, reply, timeout)) {
        throw std::runtime_error("center node subscribe topics failed.");
    }
    auto onRequest = [this](void *src_info, std::string &client_proc_id, MsgRequestTopic &request) {
@@ -117,6 +124,20 @@
        pnode_->ServerSendReply(src_info, reply);
    };
    auto OnSubRecv = [&](const std::string &proc_id, const MsgPublish &data) {
        if (data.topic() == kTopicNotifyRemoteInfo) {
            // parse other data.
            // LOG_DEBUG() << "center got net info.";
            ssjson::Json js;
            if (js.parse(data.data())) {
                if (js.is_array()) {
                    auto &center = *pscenter_;
                    center->ParseNetInfo(js);
                }
            }
        }
    };
    bool cur = false;
    if (run_.compare_exchange_strong(cur, true)) {
        auto heartbeat = [this]() {
@@ -126,7 +147,7 @@
            }
        };
        std::thread(heartbeat).swap(worker_);
        return pnode_->ServerStart(onRequest);
        return pnode_->ServerStart(onRequest) && pnode_->SubscribeStartWorker(OnSubRecv);
    } else {
        return false;
    }
box/node_center.cpp
@@ -270,6 +270,7 @@
bool NodeCenter::RemotePublish(BHMsgHead &head, const std::string &body_content)
{
    // LOG_FUNCTION;
    auto &topic = head.topic();
    auto clients = DoFindClients(topic, true);
    if (clients.empty()) { return true; }
@@ -288,9 +289,10 @@
                }
            }
            MsgI msg(shm);
            if (msg.Make(body_content)) {
            if (msg.Make(head, body_content)) {
                RecordMsg(msg);
                msgs.push_back(msg);
                // LOG_DEBUG() << "remote publish to local." << cli.mq_id_ << ", " << cli.mq_abs_addr_;
                DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
            }
        };
@@ -554,22 +556,43 @@
    typedef MsgQueryTopicReply Reply;
    auto query = [&](Node self) -> Reply {
        auto pos = service_map_.find(req.topic());
        if (pos != service_map_.end() && !pos->second.empty()) {
            auto &clients = pos->second;
            Reply reply = MakeReply<Reply>(eSuccess);
            for (auto &dest : clients) {
                Node dest_node(dest.weak_node_.lock());
                if (dest_node && Valid(*dest_node)) {
                    auto node_addr = reply.add_node_address();
                    node_addr->set_proc_id(dest_node->proc_.proc_id());
                    node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
                    node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
        Reply reply = MakeReply<Reply>(eSuccess);
        auto local = [&]() {
            auto pos = service_map_.find(req.topic());
            if (pos != service_map_.end() && !pos->second.empty()) {
                auto &clients = pos->second;
                for (auto &dest : clients) {
                    Node dest_node(dest.weak_node_.lock());
                    if (dest_node && Valid(*dest_node)) {
                        auto node_addr = reply.add_node_address();
                        node_addr->set_proc_id(dest_node->proc_.proc_id());
                        node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
                        node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
                    }
                }
                return true;
            } else {
                return false;
            }
            return reply;
        } else {
        };
        auto net = [&]() {
            auto hosts(FindRemoteRPCServers(req.topic()));
            if (hosts.empty()) {
                return false;
            } else {
                for (auto &ip : hosts) {
                    auto node_addr = reply.add_node_address();
                    node_addr->mutable_addr()->set_ip(ip);
                }
                return true;
            }
        };
        local();
        net();
        if (reply.node_address_size() == 0) {
            return MakeReply<Reply>(eNotFound, "topic server not found.");
        } else {
            return reply;
        }
    };
@@ -587,7 +610,6 @@
            sub_map[topic].insert(dest);
        }
    };
    LOG_DEBUG() << "subscribe net : " << msg.network();
    if (msg.network()) {
        Sub(net_sub_, center_.net_sub_map_);
        center_.Notify(kTopicNodeSub, *this);
@@ -651,6 +673,7 @@
NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic, bool from_remote)
{
    // LOG_FUNCTION;
    Clients dests;
    auto Find1 = [&](const std::string &exact) {
        auto FindIn = [&](auto &sub_map) {
@@ -666,8 +689,11 @@
        };
        if (!from_remote) {
            FindIn(local_sub_map_);
            // LOG_DEBUG() << "topic '" << topic << "' local clients: " << dests.size();
        }
        // net subscripitions also work in local mode.
        FindIn(net_sub_map_);
        // LOG_DEBUG() << "topic '" << topic << "' + remote clients: " << dests.size();
    };
    Find1(topic);
@@ -793,8 +819,28 @@
    }
}
std::vector<std::string> NodeCenter::FindRemoteSubClients(const Topic &topic)
void NodeCenter::NetRecords::ParseData(const ssjson::Json &info)
{
    //TODO search synced full list;
    return std::vector<std::string>();
    // LOG_FUNCTION;
    sub_hosts_.clear();
    rpc_hosts_.clear();
    for (auto &host : info.array()) {
        if (host.get("isLocal", false)) {
            host_id_ = host.get("serverId", "");
            ip_ = host.get("ip", "");
        } else {
            auto ip = host.get("ip", "");
            auto UpdateRec = [&](const ssjson::Json::array_type &lot, auto &rec) {
                for (auto &topic : lot) {
                    auto t = topic.get_value<std::string>();
                    rec[t].insert(ip);
                    // LOG_DEBUG() << "net topic: " << t << ", " << ip;
                }
            };
            // LOG_DEBUG() << "serives:";
            UpdateRec(host.child("pubTopics").array(), rpc_hosts_);
            // LOG_DEBUG() << "net sub:";
            UpdateRec(host.child("netSubTopics").array(), sub_hosts_);
        }
    }
}
box/node_center.h
@@ -18,6 +18,7 @@
#ifndef NODE_CENTER_KY67RJ1Q
#define NODE_CENTER_KY67RJ1Q
#include "json.h"
#include "shm_socket.h"
#include <unordered_map>
@@ -188,7 +189,9 @@
    void OnTimer();
    // remote hosts records
    std::vector<std::string> FindRemoteSubClients(const Topic &topic);
    std::set<std::string> FindRemoteSubClients(const Topic &topic) { return net_records_.FindSubHosts(topic); }
    std::set<std::string> FindRemoteRPCServers(const Topic &topic) { return net_records_.FindRPCHosts(topic); }
    void ParseNetInfo(ssjson::Json &info) { net_records_.ParseData(info); }
private:
    void CheckNodes();
@@ -219,6 +222,33 @@
    int64_t offline_time_;
    int64_t kill_time_;
    int64_t last_check_time_;
    // net hosts info
    class NetRecords
    {
    public:
        typedef std::set<std::string> Hosts;
        void ParseData(const ssjson::Json &input);
        Hosts FindRPCHosts(const Topic &topic) { return FindHosts(topic, rpc_hosts_); }
        Hosts FindSubHosts(const Topic &topic) { return FindHosts(topic, sub_hosts_); }
    private:
        typedef std::unordered_map<Topic, Hosts> TopicMap;
        TopicMap sub_hosts_;
        TopicMap rpc_hosts_;
        Hosts FindHosts(const Topic &topic, const TopicMap &tmap)
        {
            auto pos = tmap.find(topic);
            if (pos != tmap.end()) {
                return pos->second;
            } else {
                return Hosts();
            }
        }
        std::string host_id_;
        std::string ip_;
    };
    NetRecords net_records_;
};
#endif // end of include guard: NODE_CENTER_KY67RJ1Q
box/tcp_connection.cpp
@@ -63,7 +63,7 @@
    if (4 > len) { return false; }
    uint32_t head_len = Get32(p);
    if (head_len > 1024 * 4) {
        throw std::runtime_error("unexpected tcp reply data.");
        throw std::runtime_error("unexpected tcp data head.");
    }
    auto before_body = 4 + head_len + 4;
    if (before_body > len) {
src/bh_api.cc
@@ -226,6 +226,10 @@
{
    return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
}
int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
    return BHApi_In1_Out1<MsgTopicList>(&TopicNode::SubscribeNet, topics, topics_len, reply, reply_len, timeout_ms);
}
int BHPublish(const void *msgpub,
              const int msgpub_len,
src/bh_api.h
@@ -57,6 +57,11 @@
                      void **reply,
                      int *reply_len,
                      const int timeout_ms);
int BHSubscribeNetTopics(const void *topics,
                         const int topics_len,
                         void **reply,
                         int *reply_len,
                         const int timeout_ms);
typedef void (*FSubDataCallback)(const void *proc_id,
                                 int proc_id_len,
src/exported_symbols
@@ -7,6 +7,7 @@
    BHQueryTopicAddress;
    BHQueryProcs;
    BHSubscribeTopics;
    BHSubscribeNetTopics;
    BHStartWorker;
    BHHeartbeatEasy;
    BHHeartbeat;
src/topic_node.cpp
@@ -649,7 +649,7 @@
// subscribe
bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
bool TopicNode::DoSubscribe(MsgTopicList &topics, const bool net, MsgCommonReply &reply_body, const int timeout_ms)
{
    if (!IsOnline()) {
        SetLastError(eNotRegistered, kErrMsgNotRegistered);
@@ -659,6 +659,7 @@
    try {
        auto &sock = SockSub();
        MsgSubscribe sub;
        sub.set_network(net);
        sub.mutable_topics()->Swap(&topics);
        BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn()));
src/topic_node.h
@@ -73,7 +73,15 @@
    // subscribe
    typedef std::function<void(const std::string &proc_id, const MsgPublish &data)> SubDataCB;
    bool SubscribeStartWorker(const SubDataCB &tdcb, int nworker = 2);
    bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms);
    bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
    {
        return DoSubscribe(topics, false, reply_body, timeout_ms);
    }
    bool SubscribeNet(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
    {
        return DoSubscribe(topics, true, reply_body, timeout_ms);
    }
    bool DoSubscribe(MsgTopicList &topics, const bool net, MsgCommonReply &reply_body, const int timeout_ms);
    bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms);
    void Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2);
utest/api_test.cpp
@@ -16,6 +16,7 @@
 * =====================================================================================
 */
#include "bh_api.h"
#include "json.h"
#include "robust.h"
#include "util.h"
#include <atomic>
@@ -330,17 +331,27 @@
    }
    { // Subscribe
        MsgTopicList topics;
        topics.add_topic_list("#center.node");
        for (int i = 0; i < 10; ++i) {
            topics.add_topic_list(topic_ + std::to_string(i * 2));
        }
        std::string s = topics.SerializeAsString();
        void *reply = 0;
        int reply_len = 0;
        bool r = BHSubscribeTopics(s.data(), s.size(), &reply, &reply_len, 1000);
        BHFree(reply, reply_len);
        printf("subscribe topic : %s\n", r ? "ok" : "failed");
        auto Subscribe = [&](std::string topic, bool net) {
            MsgTopicList topics;
            topics.add_topic_list(topic);
            for (int i = 0; i < 10; ++i) {
                topics.add_topic_list(topic_ + std::to_string(i * 2));
            }
            std::string s = topics.SerializeAsString();
            void *reply = 0;
            int reply_len = 0;
            bool r = false;
            if (net) {
                r = BHSubscribeNetTopics(s.data(), s.size(), &reply, &reply_len, 1000);
            } else {
                r = BHSubscribeTopics(s.data(), s.size(), &reply, &reply_len, 1000);
            }
            BHFree(reply, reply_len);
            printf("subscribe topic %s: %s\n", topic.c_str(), (r ? "ok" : "failed"));
        };
        Subscribe("#center.node", false);
        Subscribe("local0", false);
        Subscribe("net0", true);
    }
    auto ServerLoop = [&](std::atomic<bool> *run) {
@@ -368,14 +379,47 @@
        }
    };
    std::atomic<bool> run(true);
    ThreadManager threads;
#if 1
    BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
#else
    BHStartWorker(FServerCallback(), &SubRecvProc, &ClientProc);
    threads.Launch(ServerLoop, &run);
#endif
    auto Publish = [&](const std::string &topic, const std::string &data) {
        MsgPublish pub;
        pub.set_topic(topic);
        pub.set_data(data);
        std::string s(pub.SerializeAsString());
        BHPublish(s.data(), s.size(), 0);
    };
    {
        // publish
        Publish(topic_ + std::to_string(0), "pub_data_" + std::string(104 * 1, 'a'));
        for (int i = 0; i < 1; ++i) {
            MsgPublish pub;
            pub.set_topic(topic_ + std::to_string(i));
            pub.set_data("pub_data_" + std::string(104 * 1, 'a'));
            std::string s(pub.SerializeAsString());
            BHPublish(s.data(), s.size(), 0);
            // Sleep(1s);
            ssjson::Json net = ssjson::Json::Array();
            ssjson::Json host;
            host.put("serverId", "test_host");
            host.put("ip", "127.0.0.1");
            ssjson::Json topics = ssjson::Json::Array();
            topics.push_back("aaaaa");
            topics.push_back("bbbbb");
            host.put("pubTopics", topics);
            topics.push_back("net0");
            topics.push_back("net1");
            host.put("netSubTopics", topics);
            net.push_back(host);
            Publish("pub-allRegisterInfo-to-center", net.dump());
            Sleep(1s);
            Publish("local0", "local-abcd0");
            Publish("net0", "net-abcd0");
            Publish("local0", "local-abcd1");
            Sleep(1s);
        }
    }
@@ -428,22 +472,11 @@
        }
    };
    std::atomic<bool> run(true);
    ThreadManager threads;
#if 1
    BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
#else
    BHStartWorker(FServerCallback(), &SubRecvProc, &ClientProc);
    threads.Launch(ServerLoop, &run);
#endif
    boost::timer::auto_cpu_timer timer;
    threads.Launch(hb, &run);
    threads.Launch(showStatus, &run);
    int ncli = 10;
    const int64_t nreq = 1000 * 100;
    const int64_t nreq = 1000; //* 100;
    for (int i = 0; i < 10; ++i) {
        SyncRequest(topic_ + std::to_string(0), "request_data_" + std::to_string(i));