From 58e3540930d290b315fd24d0414c8feeb7bc8bc1 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期日, 25 四月 2021 10:15:43 +0800 Subject: [PATCH] query topic add dest param. --- src/topic_node.h | 2 +- src/bh_api.h | 4 +++- src/bh_api.cpp | 49 ++++++++++++++++++++++++++++++++++++------------- src/topic_node.cpp | 5 +++-- 4 files changed, 43 insertions(+), 17 deletions(-) diff --git a/src/bh_api.cpp b/src/bh_api.cpp index 5c424e0..c4ac9c9 100644 --- a/src/bh_api.cpp +++ b/src/bh_api.cpp @@ -71,12 +71,10 @@ } template <class MsgIn, class MsgOut = MsgCommonReply> -bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int), - const void *request, - const int request_len, - void **reply, - int *reply_len, - const int timeout_ms) +bool BHApi_In1_Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int), + const void *request, const int request_len, + void **reply, int *reply_len, + const int timeout_ms) { MsgIn input; if (!input.ParseFromArray(request, request_len)) { @@ -85,6 +83,25 @@ } MsgOut msg_reply; return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) && + PackOutput(msg_reply, reply, reply_len); +} + +template <class MsgIn0, class MsgIn1, class MsgOut = MsgCommonReply> +bool BHApi_In2_Out1(bool (TopicNode::*mfunc)(MsgIn0 &, MsgIn1 &, MsgOut &, const int), + const void *in0, const int in0_len, + const void *in1, const int in1_len, + void **reply, int *reply_len, + const int timeout_ms) +{ + MsgIn0 input0; + MsgIn1 input1; + if (!input0.ParseFromArray(in0, in0_len) || + !input1.ParseFromArray(in1, in1_len)) { + SetLastError(eInvalidInput, "invalid input."); + return false; + } + MsgOut msg_reply; + return (ProcNode().*mfunc)(input0, input1, msg_reply, timeout_ms) && PackOutput(msg_reply, reply, reply_len); } @@ -102,11 +119,11 @@ int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { - return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms); + return BHApi_In1_Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms); } int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { - return BHApiIn1Out1<ProcInfo>(&TopicNode::Unregister, proc_info, proc_info_len, reply, reply_len, timeout_ms); + return BHApi_In1_Out1<ProcInfo>(&TopicNode::Unregister, proc_info, proc_info_len, reply, reply_len, timeout_ms); } int BHHeartbeatEasy(const int timeout_ms) @@ -116,21 +133,27 @@ int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { - return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms); + return BHApi_In1_Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms); } int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { - return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms); + return BHApi_In1_Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms); } -int BHQueryTopicAddress(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) +int BHQueryTopicAddress(const void *remote, const int remote_len, + const void *topic, const int topic_len, + void **reply, int *reply_len, + const int timeout_ms) { - return BHApiIn1Out1<MsgQueryTopic, MsgQueryTopicReply>(&TopicNode::QueryTopicAddress, topics, topics_len, reply, reply_len, timeout_ms); + return BHApi_In2_Out1<BHAddress, MsgQueryTopic, MsgQueryTopicReply>( + &TopicNode::QueryTopicAddress, + remote, remote_len, topic, topic_len, reply, reply_len, timeout_ms); } + int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { - return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); + return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); } int BHPublish(const void *msgpub, diff --git a/src/bh_api.h b/src/bh_api.h index 4d6846e..5c82049 100644 --- a/src/bh_api.h +++ b/src/bh_api.h @@ -36,7 +36,9 @@ int *reply_len, const int timeout_ms); -int BHQueryTopicAddress(const void *topics, +int BHQueryTopicAddress(const void *remote, + const int remote_len, + const void *topics, const int topics_len, void **reply, int *reply_len, diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 24bc4bb..00db773 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -187,7 +187,7 @@ return Heartbeat(proc, reply_body, timeout_ms); } -bool TopicNode::QueryTopicAddress(MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms) +bool TopicNode::QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms) { if (!IsOnline()) { SetLastError(eNotRegistered, "Not Registered."); @@ -417,7 +417,8 @@ MsgQueryTopic query; query.set_topic(topic); MsgQueryTopicReply rep; - if (QueryTopicAddress(query, rep, timeout_ms)) { + BHAddress dest; // empty means local. + if (QueryTopicAddress(dest, query, rep, timeout_ms)) { auto &ls = rep.node_address(); n = ls.size(); for (auto &na : ls) { diff --git a/src/topic_node.h b/src/topic_node.h index 20b27d2..76bd608 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -44,7 +44,7 @@ bool Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); bool Heartbeat(const int timeout_ms); - bool QueryTopicAddress(MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms); + bool QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms); // topic rpc server typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB; -- Gitblit v1.8.0