From 58e3540930d290b315fd24d0414c8feeb7bc8bc1 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期日, 25 四月 2021 10:15:43 +0800
Subject: [PATCH] query topic add dest param.

---
 src/topic_node.h   |    2 +-
 src/bh_api.h       |    4 +++-
 src/bh_api.cpp     |   49 ++++++++++++++++++++++++++++++++++++-------------
 src/topic_node.cpp |    5 +++--
 4 files changed, 43 insertions(+), 17 deletions(-)

diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index 5c424e0..c4ac9c9 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -71,12 +71,10 @@
 }
 
 template <class MsgIn, class MsgOut = MsgCommonReply>
-bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int),
-                  const void *request,
-                  const int request_len,
-                  void **reply,
-                  int *reply_len,
-                  const int timeout_ms)
+bool BHApi_In1_Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int),
+                    const void *request, const int request_len,
+                    void **reply, int *reply_len,
+                    const int timeout_ms)
 {
 	MsgIn input;
 	if (!input.ParseFromArray(request, request_len)) {
@@ -85,6 +83,25 @@
 	}
 	MsgOut msg_reply;
 	return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) &&
+	       PackOutput(msg_reply, reply, reply_len);
+}
+
+template <class MsgIn0, class MsgIn1, class MsgOut = MsgCommonReply>
+bool BHApi_In2_Out1(bool (TopicNode::*mfunc)(MsgIn0 &, MsgIn1 &, MsgOut &, const int),
+                    const void *in0, const int in0_len,
+                    const void *in1, const int in1_len,
+                    void **reply, int *reply_len,
+                    const int timeout_ms)
+{
+	MsgIn0 input0;
+	MsgIn1 input1;
+	if (!input0.ParseFromArray(in0, in0_len) ||
+	    !input1.ParseFromArray(in1, in1_len)) {
+		SetLastError(eInvalidInput, "invalid input.");
+		return false;
+	}
+	MsgOut msg_reply;
+	return (ProcNode().*mfunc)(input0, input1, msg_reply, timeout_ms) &&
 	       PackOutput(msg_reply, reply, reply_len);
 }
 
@@ -102,11 +119,11 @@
 
 int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
 {
-	return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
+	return BHApi_In1_Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
 }
 int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
 {
-	return BHApiIn1Out1<ProcInfo>(&TopicNode::Unregister, proc_info, proc_info_len, reply, reply_len, timeout_ms);
+	return BHApi_In1_Out1<ProcInfo>(&TopicNode::Unregister, proc_info, proc_info_len, reply, reply_len, timeout_ms);
 }
 
 int BHHeartbeatEasy(const int timeout_ms)
@@ -116,21 +133,27 @@
 
 int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
 {
-	return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
+	return BHApi_In1_Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
 }
 
 int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
 {
-	return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
+	return BHApi_In1_Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
 }
 
-int BHQueryTopicAddress(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+int BHQueryTopicAddress(const void *remote, const int remote_len,
+                        const void *topic, const int topic_len,
+                        void **reply, int *reply_len,
+                        const int timeout_ms)
 {
-	return BHApiIn1Out1<MsgQueryTopic, MsgQueryTopicReply>(&TopicNode::QueryTopicAddress, topics, topics_len, reply, reply_len, timeout_ms);
+	return BHApi_In2_Out1<BHAddress, MsgQueryTopic, MsgQueryTopicReply>(
+	    &TopicNode::QueryTopicAddress,
+	    remote, remote_len, topic, topic_len, reply, reply_len, timeout_ms);
 }
+
 int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
 {
-	return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
+	return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
 }
 
 int BHPublish(const void *msgpub,
diff --git a/src/bh_api.h b/src/bh_api.h
index 4d6846e..5c82049 100644
--- a/src/bh_api.h
+++ b/src/bh_api.h
@@ -36,7 +36,9 @@
                      int *reply_len,
                      const int timeout_ms);
 
-int BHQueryTopicAddress(const void *topics,
+int BHQueryTopicAddress(const void *remote,
+                        const int remote_len,
+                        const void *topics,
                         const int topics_len,
                         void **reply,
                         int *reply_len,
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 24bc4bb..00db773 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -187,7 +187,7 @@
 	return Heartbeat(proc, reply_body, timeout_ms);
 }
 
-bool TopicNode::QueryTopicAddress(MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms)
+bool TopicNode::QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms)
 {
 	if (!IsOnline()) {
 		SetLastError(eNotRegistered, "Not Registered.");
@@ -417,7 +417,8 @@
 	MsgQueryTopic query;
 	query.set_topic(topic);
 	MsgQueryTopicReply rep;
-	if (QueryTopicAddress(query, rep, timeout_ms)) {
+	BHAddress dest; // empty means local.
+	if (QueryTopicAddress(dest, query, rep, timeout_ms)) {
 		auto &ls = rep.node_address();
 		n = ls.size();
 		for (auto &na : ls) {
diff --git a/src/topic_node.h b/src/topic_node.h
index 20b27d2..76bd608 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -44,7 +44,7 @@
 	bool Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
 	bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
 	bool Heartbeat(const int timeout_ms);
-	bool QueryTopicAddress(MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms);
+	bool QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms);
 
 	// topic rpc server
 	typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB;

--
Gitblit v1.8.0