lichao
2021-04-25 58e3540930d290b315fd24d0414c8feeb7bc8bc1
query topic add dest param.
4个文件已修改
60 ■■■■ 已修改文件
src/bh_api.cpp 49 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.h 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cpp
@@ -71,12 +71,10 @@
}
template <class MsgIn, class MsgOut = MsgCommonReply>
bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int),
                  const void *request,
                  const int request_len,
                  void **reply,
                  int *reply_len,
                  const int timeout_ms)
bool BHApi_In1_Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int),
                    const void *request, const int request_len,
                    void **reply, int *reply_len,
                    const int timeout_ms)
{
    MsgIn input;
    if (!input.ParseFromArray(request, request_len)) {
@@ -85,6 +83,25 @@
    }
    MsgOut msg_reply;
    return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) &&
           PackOutput(msg_reply, reply, reply_len);
}
template <class MsgIn0, class MsgIn1, class MsgOut = MsgCommonReply>
bool BHApi_In2_Out1(bool (TopicNode::*mfunc)(MsgIn0 &, MsgIn1 &, MsgOut &, const int),
                    const void *in0, const int in0_len,
                    const void *in1, const int in1_len,
                    void **reply, int *reply_len,
                    const int timeout_ms)
{
    MsgIn0 input0;
    MsgIn1 input1;
    if (!input0.ParseFromArray(in0, in0_len) ||
        !input1.ParseFromArray(in1, in1_len)) {
        SetLastError(eInvalidInput, "invalid input.");
        return false;
    }
    MsgOut msg_reply;
    return (ProcNode().*mfunc)(input0, input1, msg_reply, timeout_ms) &&
           PackOutput(msg_reply, reply, reply_len);
}
@@ -102,11 +119,11 @@
int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
    return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
    return BHApi_In1_Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
    return BHApiIn1Out1<ProcInfo>(&TopicNode::Unregister, proc_info, proc_info_len, reply, reply_len, timeout_ms);
    return BHApi_In1_Out1<ProcInfo>(&TopicNode::Unregister, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
int BHHeartbeatEasy(const int timeout_ms)
@@ -116,21 +133,27 @@
int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
    return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
    return BHApi_In1_Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
    return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
    return BHApi_In1_Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
}
int BHQueryTopicAddress(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
int BHQueryTopicAddress(const void *remote, const int remote_len,
                        const void *topic, const int topic_len,
                        void **reply, int *reply_len,
                        const int timeout_ms)
{
    return BHApiIn1Out1<MsgQueryTopic, MsgQueryTopicReply>(&TopicNode::QueryTopicAddress, topics, topics_len, reply, reply_len, timeout_ms);
    return BHApi_In2_Out1<BHAddress, MsgQueryTopic, MsgQueryTopicReply>(
        &TopicNode::QueryTopicAddress,
        remote, remote_len, topic, topic_len, reply, reply_len, timeout_ms);
}
int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
    return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
    return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
}
int BHPublish(const void *msgpub,
src/bh_api.h
@@ -36,7 +36,9 @@
                     int *reply_len,
                     const int timeout_ms);
int BHQueryTopicAddress(const void *topics,
int BHQueryTopicAddress(const void *remote,
                        const int remote_len,
                        const void *topics,
                        const int topics_len,
                        void **reply,
                        int *reply_len,
src/topic_node.cpp
@@ -187,7 +187,7 @@
    return Heartbeat(proc, reply_body, timeout_ms);
}
bool TopicNode::QueryTopicAddress(MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms)
bool TopicNode::QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms)
{
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
@@ -417,7 +417,8 @@
    MsgQueryTopic query;
    query.set_topic(topic);
    MsgQueryTopicReply rep;
    if (QueryTopicAddress(query, rep, timeout_ms)) {
    BHAddress dest; // empty means local.
    if (QueryTopicAddress(dest, query, rep, timeout_ms)) {
        auto &ls = rep.node_address();
        n = ls.size();
        for (auto &na : ls) {
src/topic_node.h
@@ -44,7 +44,7 @@
    bool Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
    bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
    bool Heartbeat(const int timeout_ms);
    bool QueryTopicAddress(MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms);
    bool QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms);
    // topic rpc server
    typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB;