lichao
2021-05-19 34cd75f77d0ca94dbdba4e6cc9451fe4d33e78b3
add api BHQueryProcs.
12个文件已修改
169 ■■■■■ 已修改文件
api/bhsgo/bhome_node.go 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/bhsgo/bhome_node_test.go 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp 43 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg.proto 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cc 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.h 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/exported_symbols 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/proto.h 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 39 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/bhsgo/bhome_node.go
@@ -75,6 +75,20 @@
}
func QueryProcs(dest_addr *bh.BHAddress, topic *bh.MsgQueryProc, reply *bh.MsgQueryProcReply, timeout_ms int) bool {
    dest, _ := dest_addr.Marshal()
    data, _ := topic.Marshal()
    creply := unsafe.Pointer(nil)
    creply_len := C.int(0)
    defer C.BHFree(creply, creply_len)
    r := C.BHQueryProcs(getPtr(&dest), C.int(len(dest)), getPtr(&data), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0
    if r {
        reply.Unmarshal(C.GoBytes(creply, creply_len))
    }
    return r
}
func Publish(pub *bh.MsgPublish, timeout_ms int) bool {
    data, _ := pub.Marshal()
    return C.BHPublish(getPtr(&data), C.int(len(data)), C.int(timeout_ms)) > 0
api/bhsgo/bhome_node_test.go
@@ -26,6 +26,7 @@
    proc_id := "test_proc"
    proc := bh.ProcInfo{}
    proc.ProcId = []byte(proc_id)
    fmt.Println("proc id: ", proc.ProcId)
    reply := bh.MsgCommonReply{}
    defer Cleanup()
@@ -69,6 +70,7 @@
    } else {
        fmt.Println("reg topics failed")
    }
    req := bh.MsgRequestTopic{}
    time.Sleep(time.Second * 1)
    req.Topic = []byte("topic0")
@@ -81,7 +83,14 @@
    pid := ""
    rr := bh.MsgRequestTopicReply{}
    dest := bh.BHAddress{}
    for i := 0; i < 100; i++ {
    queryProc := bh.MsgQueryProc{}
    queryReply := bh.MsgQueryProcReply{}
    QueryProcs(&dest, &queryProc, &queryReply, 3000)
    fmt.Println("query result:", string(queryReply.ProcList[0].Proc.ProcId))
    for i := 0; i < 10; i++ {
        if Request(&dest, &req, &pid, &rr, 3000) {
            fmt.Println("server:" + pid + ", reply:" + string(rr.Data))
        } else {
box/center.cpp
@@ -479,12 +479,52 @@
            return MakeReply(eSuccess);
        });
    }
    MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req)
    {
        typedef MsgQueryProcReply Reply;
        auto query = [&](Node self) -> Reply {
            auto Add1 = [](Reply &reply, Node node) {
                auto info = reply.add_proc_list();
                *info->mutable_proc() = node->proc_;
                info->set_online(node->state_.flag_ == kStateNormal);
                for (auto &addr_topics : node->services_) {
                    for (auto &topic : addr_topics.second) {
                        info->mutable_topics()->add_topic_list(topic);
                    }
                }
            };
            if (!req.proc_id().empty()) {
                auto pos = online_node_addr_map_.find(req.proc_id());
                if (pos == online_node_addr_map_.end()) {
                    return MakeReply<Reply>(eNotFound, "proc not found.");
                } else {
                    auto node_pos = nodes_.find(pos->second);
                    if (node_pos == nodes_.end()) {
                        return MakeReply<Reply>(eNotFound, "proc node not found.");
                    } else {
                        auto reply = MakeReply<Reply>(eSuccess);
                        Add1(reply, node_pos->second);
                        return reply;
                    }
                }
            } else {
                Reply reply(MakeReply<Reply>(eSuccess));
                for (auto &kv : nodes_) {
                    Add1(reply, kv.second);
                }
                return reply;
            }
        };
        return HandleMsg<Reply>(head, query);
    }
    MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req)
    {
        typedef MsgQueryTopicReply Reply;
        auto query = [&](Node self) -> MsgQueryTopicReply {
        auto query = [&](Node self) -> Reply {
            auto pos = service_map_.find(req.topic());
            if (pos != service_map_.end() && !pos->second.empty()) {
                auto &clients = pos->second;
@@ -747,6 +787,7 @@
            CASE_ON_MSG_TYPE(RegisterRPC);
            CASE_ON_MSG_TYPE(QueryTopic);
            CASE_ON_MSG_TYPE(QueryProc);
        default: return false;
        }
    };
proto/source/bhome_msg.proto
@@ -49,6 +49,8 @@
    // kMsgTypeUnsubscribeReply = 25;
    kMsgTypeUnregister = 26;
    // kMsgTypeUnregisterReply = 27;
    kMsgTypeQueryProc = 28;
    kMsgTypeQueryProcReply = 29;
}
src/bh_api.cc
@@ -194,6 +194,19 @@
        remote, remote_len, topic, topic_len, reply, reply_len, timeout_ms);
}
int BHQueryProcs(const void *remote,
                 const int remote_len,
                 const void *query,
                 const int query_len,
                 void **reply,
                 int *reply_len,
                 const int timeout_ms)
{
    return BHApi_In2_Out1<BHAddress, MsgQueryProc, MsgQueryProcReply>(
        &TopicNode::QueryProcs,
        remote, remote_len, query, query_len, reply, reply_len, timeout_ms);
}
int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
    return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
src/bh_api.h
@@ -44,6 +44,14 @@
                        int *reply_len,
                        const int timeout_ms);
int BHQueryProcs(const void *remote,
                 const int remote_len,
                 const void *query,
                 const int query_len,
                 void **reply,
                 int *reply_len,
                 const int timeout_ms);
int BHSubscribeTopics(const void *topics,
                      const int topics_len,
                      void **reply,
src/exported_symbols
@@ -5,6 +5,7 @@
    BHUnregister;
    BHRegisterTopics;
    BHQueryTopicAddress;
    BHQueryProcs;
    BHSubscribeTopics;
    BHStartWorker;
    BHHeartbeatEasy;
src/msg.h
@@ -74,12 +74,13 @@
        RefCount count_;
        const uint32_t tag_ = kMsgTag;
        const uint32_t size_ = 0;
        const uint32_t capacity_ = 0;
        const int64_t id_ = 0;
        std::atomic<int64_t> timestamp_;
        bool managed_ = false;
        uint32_t size_ = 0;
        Meta(uint32_t size) :
            size_(size), id_(NewId()), timestamp_(NowSec()) {}
            capacity_(size), id_(NewId()), timestamp_(NowSec()) {}
    };
    OffsetType offset_;
    static void *Alloc(const size_t size)
@@ -111,6 +112,7 @@
            };
            Pack1(head_len, [&](void *p, int len) { head.SerializeToArray(p, len); });
            Pack1(body_len, [&](void *p, int len) { body.SerializeToArray(p, len); });
            meta()->size_ = 4 + head_len + 4 + body_len;
        }
        return addr;
    }
@@ -120,6 +122,7 @@
        void *addr = get();
        if (addr) {
            memcpy(addr, content.data(), content.size());
            meta()->size_ = content.size();
        }
        return addr;
    }
@@ -174,11 +177,11 @@
        uint32_t head_len = head.ByteSizeLong();
        uint32_t body_len = body.ByteSizeLong();
        uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len;
        return valid() && (meta()->size_ >= size) && Pack(head, head_len, body, body_len);
        return valid() && (meta()->capacity_ >= size) && Pack(head, head_len, body, body_len);
    }
    inline bool Make(const std::string &content) { return Make(content.size()) && Pack(content); }
    inline bool Fill(const std::string &content) { return valid() && (meta()->size_ >= content.size()) && Pack(content); }
    inline bool Fill(const std::string &content) { return valid() && (meta()->capacity_ >= content.size()) && Pack(content); }
    inline bool Make(const size_t size) { return Make(Alloc(size)); }
@@ -209,6 +212,11 @@
        p += 4;
        return head.ParseFromArray(p, msg_size);
    }
    std::string content() const
    {
        auto p = get<char>();
        return p ? std::string(p, meta()->size_) : std::string();
    }
    std::string body() const
    {
        auto p = get<char>();
src/proto.h
@@ -50,6 +50,8 @@
BHOME_SIMPLE_MAP_MSG(Unsubscribe);
BHOME_SIMPLE_MAP_MSG(ProcInit);
BHOME_SIMPLE_MAP_MSG(ProcInitReply);
BHOME_SIMPLE_MAP_MSG(QueryProc);
BHOME_SIMPLE_MAP_MSG(QueryProcReply);
#undef BHOME_SIMPLE_MAP_MSG
#undef BHOME_MAP_MSG_AND_TYPE
src/topic_node.cpp
@@ -292,6 +292,25 @@
            reply.ParseBody(reply_body));
}
bool TopicNode::QueryProcs(BHAddress &dest, MsgQueryProc &query, MsgQueryProcReply &reply_body, const int timeout_ms)
{
    if (!IsOnline()) {
        SetLastError(eNotRegistered, kErrMsgNotRegistered);
        return false;
    }
    auto &sock = SockNode();
    BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn()));
    AddRoute(head, sock);
    MsgI reply;
    DEFER1(reply.Release());
    BHMsgHead reply_head;
    return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) &&
            reply_head.type() == kMsgTypeQueryProcReply &&
            reply.ParseBody(reply_body));
}
bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
    if (!IsOnline()) {
src/topic_node.h
@@ -46,6 +46,7 @@
    bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
    bool Heartbeat(const int timeout_ms);
    bool QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms);
    bool QueryProcs(BHAddress &dest, MsgQueryProc &query, MsgQueryProcReply &reply_body, const int timeout_ms);
    // topic rpc server
    typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB;
utest/api_test.cpp
@@ -165,7 +165,34 @@
        void *reply = 0;
        int reply_len = 0;
        bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
        BHFree(reply, reply_len);
        DEFER1(BHFree(reply, reply_len));
        // printf("register topic : %s\n", r ? "ok" : "failed");
        // Sleep(1s);
    }
    {
        // query procs
        std::string dest(BHAddress().SerializeAsString());
        MsgQueryProc query;
        std::string s = query.SerializeAsString();
        void *reply = 0;
        int reply_len = 0;
        bool r = BHQueryProcs(dest.data(), dest.size(), s.data(), s.size(), &reply, &reply_len, 1000);
        DEFER1(BHFree(reply, reply_len));
        MsgQueryProcReply result;
        if (result.ParseFromArray(reply, reply_len) && IsSuccess(result.errmsg().errcode())) {
            printf("query proc result: %d\n", result.proc_list().size());
            for (int i = 0; i < result.proc_list().size(); ++i) {
                auto &info = result.proc_list(i);
                printf("proc [%d] %s, %s, %s\n\ttopics\n", i,
                       (info.online() ? "online" : "offline"),
                       info.proc().proc_id().c_str(), info.proc().name().c_str());
                for (auto &t : info.topics().topic_list()) {
                    printf("\t\t %s", t.c_str());
                }
            }
        } else {
            printf("query proc error\n");
        }
        // printf("register topic : %s\n", r ? "ok" : "failed");
        // Sleep(1s);
    }
@@ -310,15 +337,13 @@
    int ncli = 10;
    const int64_t nreq = 1000 * 100;
#if 1
    for (int i = 0; i < ncli; ++i) {
        threads.Launch(asyncRequest, nreq);
    }
#else
    for (int i = 0; i < 100; ++i) {
        SyncRequest(i);
    }
#endif
    for (int i = 0; i < ncli; ++i) {
        threads.Launch(asyncRequest, nreq);
    }
    int same = 0;
    uint64_t last = 0;