From 58e3540930d290b315fd24d0414c8feeb7bc8bc1 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期日, 25 四月 2021 10:15:43 +0800
Subject: [PATCH] query topic add dest param.
---
src/topic_node.h | 2 +-
src/bh_api.h | 4 +++-
src/bh_api.cpp | 49 ++++++++++++++++++++++++++++++++++++-------------
src/topic_node.cpp | 5 +++--
4 files changed, 43 insertions(+), 17 deletions(-)
diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index 5c424e0..c4ac9c9 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -71,12 +71,10 @@
}
template <class MsgIn, class MsgOut = MsgCommonReply>
-bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int),
- const void *request,
- const int request_len,
- void **reply,
- int *reply_len,
- const int timeout_ms)
+bool BHApi_In1_Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int),
+ const void *request, const int request_len,
+ void **reply, int *reply_len,
+ const int timeout_ms)
{
MsgIn input;
if (!input.ParseFromArray(request, request_len)) {
@@ -85,6 +83,25 @@
}
MsgOut msg_reply;
return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) &&
+ PackOutput(msg_reply, reply, reply_len);
+}
+
+template <class MsgIn0, class MsgIn1, class MsgOut = MsgCommonReply>
+bool BHApi_In2_Out1(bool (TopicNode::*mfunc)(MsgIn0 &, MsgIn1 &, MsgOut &, const int),
+ const void *in0, const int in0_len,
+ const void *in1, const int in1_len,
+ void **reply, int *reply_len,
+ const int timeout_ms)
+{
+ MsgIn0 input0;
+ MsgIn1 input1;
+ if (!input0.ParseFromArray(in0, in0_len) ||
+ !input1.ParseFromArray(in1, in1_len)) {
+ SetLastError(eInvalidInput, "invalid input.");
+ return false;
+ }
+ MsgOut msg_reply;
+ return (ProcNode().*mfunc)(input0, input1, msg_reply, timeout_ms) &&
PackOutput(msg_reply, reply, reply_len);
}
@@ -102,11 +119,11 @@
int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
- return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
+ return BHApi_In1_Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
- return BHApiIn1Out1<ProcInfo>(&TopicNode::Unregister, proc_info, proc_info_len, reply, reply_len, timeout_ms);
+ return BHApi_In1_Out1<ProcInfo>(&TopicNode::Unregister, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
int BHHeartbeatEasy(const int timeout_ms)
@@ -116,21 +133,27 @@
int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
- return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
+ return BHApi_In1_Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
- return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
+ return BHApi_In1_Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
}
-int BHQueryTopicAddress(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+int BHQueryTopicAddress(const void *remote, const int remote_len,
+ const void *topic, const int topic_len,
+ void **reply, int *reply_len,
+ const int timeout_ms)
{
- return BHApiIn1Out1<MsgQueryTopic, MsgQueryTopicReply>(&TopicNode::QueryTopicAddress, topics, topics_len, reply, reply_len, timeout_ms);
+ return BHApi_In2_Out1<BHAddress, MsgQueryTopic, MsgQueryTopicReply>(
+ &TopicNode::QueryTopicAddress,
+ remote, remote_len, topic, topic_len, reply, reply_len, timeout_ms);
}
+
int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
- return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
+ return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
}
int BHPublish(const void *msgpub,
diff --git a/src/bh_api.h b/src/bh_api.h
index 4d6846e..5c82049 100644
--- a/src/bh_api.h
+++ b/src/bh_api.h
@@ -36,7 +36,9 @@
int *reply_len,
const int timeout_ms);
-int BHQueryTopicAddress(const void *topics,
+int BHQueryTopicAddress(const void *remote,
+ const int remote_len,
+ const void *topics,
const int topics_len,
void **reply,
int *reply_len,
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 24bc4bb..00db773 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -187,7 +187,7 @@
return Heartbeat(proc, reply_body, timeout_ms);
}
-bool TopicNode::QueryTopicAddress(MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms)
+bool TopicNode::QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms)
{
if (!IsOnline()) {
SetLastError(eNotRegistered, "Not Registered.");
@@ -417,7 +417,8 @@
MsgQueryTopic query;
query.set_topic(topic);
MsgQueryTopicReply rep;
- if (QueryTopicAddress(query, rep, timeout_ms)) {
+ BHAddress dest; // empty means local.
+ if (QueryTopicAddress(dest, query, rep, timeout_ms)) {
auto &ls = rep.node_address();
n = ls.size();
for (auto &na : ls) {
diff --git a/src/topic_node.h b/src/topic_node.h
index 20b27d2..76bd608 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -44,7 +44,7 @@
bool Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
bool Heartbeat(const int timeout_ms);
- bool QueryTopicAddress(MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms);
+ bool QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms);
// topic rpc server
typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB;
--
Gitblit v1.8.0