lichao
2021-05-21 11f6c600e55ca5677f93624efe44d2605cdd908d
reserve #,@ prefix for internal proc id and topic.
5个文件已修改
64 ■■■■ 已修改文件
box/center.cpp 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center_topic_node.cpp 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 21 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp
@@ -106,7 +106,7 @@
        default: return false;
        }
    };
    BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000);
    BHCenter::Install("@center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000);
    auto OnBusIdle = [=](ShmSocket &socket) {};
    auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; };
@@ -142,7 +142,7 @@
        }
    };
    BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000);
    BHCenter::Install("@center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000);
    return true;
}
@@ -166,7 +166,7 @@
BHCenter::BHCenter(Socket::Shm &shm)
{
    auto nsec = NodeTimeoutSec();
    auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
    auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
    AddCenter(center_ptr, shm);
    for (auto &kv : Centers()) {
box/center_topic_node.cpp
@@ -29,7 +29,7 @@
namespace
{
const std::string &kTopicQueryProc = "@center_query_procs";
const std::string &kTopicQueryProc = "#center_query_procs";
std::string ToJson(const MsgQueryProcReply &qpr)
{
@@ -77,15 +77,18 @@
    MsgCommonReply reply;
    ProcInfo info;
    info.set_proc_id("#center.node");
    info.set_proc_id("@center.node");
    info.set_name("center node");
    if (!pnode_->UniRegister(true, info, reply, timeout)) {
    Json jinfo;
    jinfo.put("description", "some center services. Other nodes may use topics to use them.");
    info.set_public_info(jinfo.dump());
    if (!pnode_->DoRegister(true, info, reply, timeout)) {
        throw std::runtime_error("center node register failed.");
    }
    MsgTopicList topics;
    topics.add_topic_list(kTopicQueryProc);
    if (!pnode_->ServerRegisterRPC(topics, reply, timeout)) {
    if (!pnode_->DoServerRegisterRPC(true, topics, reply, timeout)) {
        throw std::runtime_error("center node register topics failed.");
    }
src/topic_node.cpp
@@ -29,6 +29,11 @@
namespace
{
bool ValidUserSymbol(const std::string &s)
{
    return !s.empty() && s[0] != '#' && s[0] != '@';
}
inline void AddRoute(BHMsgHead &head, const ShmSocket &sock)
{
    auto route = head.add_route();
@@ -143,10 +148,9 @@
    for (auto &p : sockets_) { p->Stop(); }
}
bool TopicNode::UniRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
bool TopicNode::DoRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
    auto ValidUserProcId = [](const std::string &id) { return !id.empty() && id[0] != '#'; };
    if (!internal && !ValidUserProcId(proc.proc_id())) {
    if (!internal && !ValidUserSymbol(proc.proc_id())) {
        SetLastError(eInvalidInput, "invalid proc id :'" + proc.proc_id() + "'");
        return false;
    }
@@ -315,8 +319,17 @@
            reply.ParseBody(reply_body));
}
bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
bool TopicNode::DoServerRegisterRPC(const bool internal, MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
    if (!internal) {
        for (auto &&topic : topics.topic_list()) {
            if (!ValidUserSymbol(topic)) {
                SetLastError(eInvalidInput, "invalid user topic :'" + topic + "'");
                return false;
            }
        }
    }
    if (!IsOnline()) {
        SetLastError(eNotRegistered, kErrMsgNotRegistered);
        return false;
src/topic_node.h
@@ -43,8 +43,8 @@
    ~TopicNode();
    // topic node
    bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { return UniRegister(false, proc, reply_body, timeout_ms); }
    bool UniRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
    bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { return DoRegister(false, proc, reply_body, timeout_ms); }
    bool DoRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
    bool Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
    bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
    bool Heartbeat(const int timeout_ms);
@@ -56,7 +56,8 @@
    typedef std::function<void(void *src_info, std::string &client_proc_id, MsgRequestTopic &request)> ServerAsyncCB;
    bool ServerStart(ServerSyncCB const &cb, const int nworker = 2);
    bool ServerStart(ServerAsyncCB const &cb, const int nworker = 2);
    bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms);
    bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms) { return DoServerRegisterRPC(false, topics, reply, timeout_ms); }
    bool DoServerRegisterRPC(const bool internal, MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms);
    bool ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms);
    bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply);
utest/api_test.cpp
@@ -176,8 +176,21 @@
        int reply_len = 0;
        bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
        DEFER1(BHFree(reply, reply_len));
        // printf("register topic : %s\n", r ? "ok" : "failed");
        // Sleep(1s);
    }
    { // Server Register Topics
        MsgTopicList topics;
        topics.add_topic_list("@should_fail");
        std::string s = topics.SerializeAsString();
        void *reply = 0;
        int reply_len = 0;
        bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
        DEFER1(BHFree(reply, reply_len));
        if (!r) {
            int ec = 0;
            std::string msg;
            GetApiError(ec, msg);
            printf("register rpc failed, %d, %s\n", ec, msg.c_str());
        }
    }
    auto PrintProcs = [](MsgQueryProcReply const &result) {
        printf("query proc result: %d\n", result.proc_list().size());
@@ -214,7 +227,7 @@
    {
        // query procs with normal topic request
        MsgRequestTopic req;
        req.set_topic("@center_query_procs");
        req.set_topic("#center_query_procs");
        // req.set_data("{\"proc_id\":\"#center.node\"}");
        std::string s(req.SerializeAsString());
        // Sleep(10ms, false);