From f6535ea2ae09b3cdca9104fa19dfff39a47271ea Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 21 四月 2021 18:40:47 +0800
Subject: [PATCH] change api, add request remote address.

---
 utest/api_test.cpp           |    3 +
 api/bhsgo/bhome_node_test.go |    3 +
 api/bhsgo/bhome_node.go      |   25 ++++++++----
 src/topic_node.h             |    4 +-
 src/bh_api.h                 |    8 +++-
 utest/utest.cpp              |    2 
 src/bh_api.cpp               |   20 +++++++---
 src/topic_node.cpp           |    4 +-
 8 files changed, 46 insertions(+), 23 deletions(-)

diff --git a/api/bhsgo/bhome_node.go b/api/bhsgo/bhome_node.go
index c5d4019..7e7bf17 100644
--- a/api/bhsgo/bhome_node.go
+++ b/api/bhsgo/bhome_node.go
@@ -12,16 +12,23 @@
 	"unsafe"
 )
 
+func getPtr(n *[]byte) unsafe.Pointer {
+	if len(*n) > 0 {
+		return unsafe.Pointer(&((*n)[0]))
+	} else {
+		return unsafe.Pointer(nil)
+	}
+}
+
 func bhApiIn1Out1(bhfunc C.FBHApiIn1Out1, data []byte, reply *bh.MsgCommonReply, timeout_ms int) bool {
 	creply := unsafe.Pointer(nil)
 	creply_len := C.int(0)
 	defer C.BHFree(creply, creply_len)
-	r := C.BHApiIn1Out1Proxy(bhfunc, unsafe.Pointer(&data[0]), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0
+	r := C.BHApiIn1Out1Proxy(bhfunc, getPtr(&data), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0
 	if r {
 		reply.Unmarshal(C.GoBytes(creply, creply_len))
 	}
 	return r
-
 }
 
 func Register(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool {
@@ -50,7 +57,7 @@
 
 func Publish(pub *bh.MsgPublish, timeout_ms int) bool {
 	data, _ := pub.Marshal()
-	return C.BHPublish(unsafe.Pointer(&data[0]), C.int(len(data)), C.int(timeout_ms)) > 0
+	return C.BHPublish(getPtr(&data), C.int(len(data)), C.int(timeout_ms)) > 0
 }
 
 func ReadSub(proc_id *string, pub *bh.MsgPublish, timeout_ms int) bool {
@@ -67,19 +74,21 @@
 	return r
 }
 
-func AsyncRequest(req *bh.MsgRequestTopic, msg_id *[]byte) bool {
+func AsyncRequest(dest_addr *bh.BHAddress, req *bh.MsgRequestTopic, msg_id *[]byte) bool {
+	dest, _ := dest_addr.Marshal()
 	data, _ := req.Marshal()
 	creply := unsafe.Pointer(nil)
 	creply_len := C.int(0)
 	defer C.BHFree(creply, creply_len)
-	r := C.BHAsyncRequest(unsafe.Pointer(&data[0]), C.int(len(data)), &creply, &creply_len) > 0
+	r := C.BHAsyncRequest(getPtr(&dest), C.int(len(dest)), getPtr(&data), C.int(len(data)), &creply, &creply_len) > 0
 	if r {
 		*msg_id = C.GoBytes(creply, creply_len)
 	}
 	return r
 }
 
-func Request(req *bh.MsgRequestTopic, proc_id *string, reply *bh.MsgRequestTopicReply, timeout_ms int) bool {
+func Request(dest_addr *bh.BHAddress, req *bh.MsgRequestTopic, proc_id *string, reply *bh.MsgRequestTopicReply, timeout_ms int) bool {
+	dest, _ := dest_addr.Marshal()
 	data, _ := req.Marshal()
 	cpid := unsafe.Pointer(nil)
 	cpid_len := C.int(0)
@@ -87,7 +96,7 @@
 	creply := unsafe.Pointer(nil)
 	creply_len := C.int(0)
 	defer C.BHFree(creply, creply_len)
-	r := C.BHRequest(unsafe.Pointer(&data[0]), C.int(len(data)), &cpid, &cpid_len, &creply, &creply_len, C.int(timeout_ms)) > 0
+	r := C.BHRequest(getPtr(&dest), C.int(len(dest)), getPtr(&data), C.int(len(data)), &cpid, &cpid_len, &creply, &creply_len, C.int(timeout_ms)) > 0
 	if r {
 		*proc_id = string(C.GoBytes(cpid, cpid_len))
 		reply.Unmarshal(C.GoBytes(creply, creply_len))
@@ -112,7 +121,7 @@
 
 func SendReply(src unsafe.Pointer, rep *bh.MsgRequestTopicReply) bool {
 	data, _ := rep.Marshal()
-	return C.BHSendReply(src, unsafe.Pointer(&data[0]), C.int(len(data))) > 0
+	return C.BHSendReply(src, getPtr(&data), C.int(len(data))) > 0
 }
 
 func GetLastError() (int, string) {
diff --git a/api/bhsgo/bhome_node_test.go b/api/bhsgo/bhome_node_test.go
index 3629d89..9f2b324 100644
--- a/api/bhsgo/bhome_node_test.go
+++ b/api/bhsgo/bhome_node_test.go
@@ -64,8 +64,9 @@
 
 	pid := ""
 	rr := bh.MsgRequestTopicReply{}
+	dest := bh.BHAddress{}
 	for i := 0; i < 10000; i++ {
-		if Request(&req, &pid, &rr, 3000) {
+		if Request(&dest, &req, &pid, &rr, 3000) {
 			fmt.Println("server:" + pid + ", reply:" + string(rr.Data))
 		} else {
 			e, s := GetLastError()
diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index cdf2e96..8a4b947 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -157,19 +157,23 @@
 	return false;
 }
 
-int BHAsyncRequest(const void *request,
+int BHAsyncRequest(const void *remote,
+                   const int remote_len,
+                   const void *request,
                    const int request_len,
                    void **msg_id,
                    int *msg_id_len)
 {
+	BHAddress dest;
 	MsgRequestTopic req;
-	if (!req.ParseFromArray(request, request_len)) {
+	if (!dest.ParseFromArray(remote, remote_len) ||
+	    !req.ParseFromArray(request, request_len)) {
 		SetLastError(eInvalidInput, "invalid input.");
 		return false;
 	}
 	std::string str_msg_id;
 	MsgRequestTopicReply out_msg;
-	if (ProcNode().ClientAsyncRequest(req, str_msg_id)) {
+	if (ProcNode().ClientAsyncRequest(dest, req, str_msg_id)) {
 		if (!msg_id || !msg_id_len) {
 			return true;
 		}
@@ -184,7 +188,9 @@
 	return false;
 }
 
-int BHRequest(const void *request,
+int BHRequest(const void *remote,
+              const int remote_len,
+              const void *request,
               const int request_len,
               void **proc_id,
               int *proc_id_len,
@@ -192,14 +198,16 @@
               int *reply_len,
               const int timeout_ms)
 {
+	BHAddress dest;
 	MsgRequestTopic req;
-	if (!req.ParseFromArray(request, request_len)) {
+	if (!dest.ParseFromArray(remote, remote_len) ||
+	    !req.ParseFromArray(request, request_len)) {
 		SetLastError(eInvalidInput, "invalid input.");
 		return false;
 	}
 	std::string proc;
 	MsgRequestTopicReply out_msg;
-	if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) {
+	if (ProcNode().ClientSyncRequest(dest, req, proc, out_msg, timeout_ms)) {
 		TmpPtr pproc(proc);
 		if (pproc && PackOutput(out_msg, reply, reply_len)) {
 			pproc.ReleaseTo(proc_id, proc_id_len);
diff --git a/src/bh_api.h b/src/bh_api.h
index 5c64ae7..33a70cb 100644
--- a/src/bh_api.h
+++ b/src/bh_api.h
@@ -73,12 +73,16 @@
               int *msgpub_len,
               const int timeout_ms);
 
-int BHAsyncRequest(const void *request,
+int BHAsyncRequest(const void *remote,
+                   const int remote_len,
+                   const void *request,
                    const int request_len,
                    void **msg_id,
                    int *msg_id_len);
 
-int BHRequest(const void *request,
+int BHRequest(const void *remote,
+              const int remote_len,
+              const void *request,
               const int request_len,
               void **proc_id,
               int *proc_id_len,
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 8bf9cf8..4f0c96f 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -283,7 +283,7 @@
 	return SockClient().Start(onData, nworker);
 }
 
-bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb)
+bool TopicNode::ClientAsyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb)
 {
 	if (!IsOnline()) {
 		SetLastError(eNotRegistered, "Not Registered.");
@@ -349,7 +349,7 @@
 	}
 }
 
-bool TopicNode::ClientSyncRequest(const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms)
+bool TopicNode::ClientSyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms)
 {
 	if (!IsOnline()) {
 		SetLastError(eNotRegistered, "Not Registered.");
diff --git a/src/topic_node.h b/src/topic_node.h
index b2fae5b..3371c35 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -56,8 +56,8 @@
 	// topic client
 	typedef std::function<void(const BHMsgHead &head, const MsgRequestTopicReply &reply)> RequestResultCB;
 	bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2);
-	bool ClientAsyncRequest(const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB());
-	bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms);
+	bool ClientAsyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB());
+	bool ClientSyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms);
 
 	// publish
 	bool Publish(const MsgPublish &pub, const int timeout_ms);
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 766c0f8..7981a2c 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -223,7 +223,8 @@
 			void *msg_id = 0;
 			int len = 0;
 			// Sleep(10ms, false);
-			bool r = BHAsyncRequest(s.data(), s.size(), 0, 0);
+			std::string dest(BHAddress().SerializeAsString());
+			bool r = BHAsyncRequest(dest.data(), dest.size(), s.data(), s.size(), 0, 0);
 			DEFER1(BHFree(msg_id, len););
 			if (r) {
 				++Status().nrequest_;
diff --git a/utest/utest.cpp b/utest/utest.cpp
index b2de97f..fae22b1 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -233,7 +233,7 @@
 		boost::timer::auto_cpu_timer timer;
 		for (int i = 0; i < nreq; ++i) {
 			std::string msg_id;
-			if (!client.ClientAsyncRequest(req, msg_id)) {
+			if (!client.ClientAsyncRequest(BHAddress(), req, msg_id)) {
 				printf("client request failed\n");
 				++count;
 			}

--
Gitblit v1.8.0