From f6535ea2ae09b3cdca9104fa19dfff39a47271ea Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 21 四月 2021 18:40:47 +0800
Subject: [PATCH] change api, add request remote address.
---
utest/api_test.cpp | 3 +
api/bhsgo/bhome_node_test.go | 3 +
api/bhsgo/bhome_node.go | 25 ++++++++----
src/topic_node.h | 4 +-
src/bh_api.h | 8 +++-
utest/utest.cpp | 2
src/bh_api.cpp | 20 +++++++---
src/topic_node.cpp | 4 +-
8 files changed, 46 insertions(+), 23 deletions(-)
diff --git a/api/bhsgo/bhome_node.go b/api/bhsgo/bhome_node.go
index c5d4019..7e7bf17 100644
--- a/api/bhsgo/bhome_node.go
+++ b/api/bhsgo/bhome_node.go
@@ -12,16 +12,23 @@
"unsafe"
)
+func getPtr(n *[]byte) unsafe.Pointer {
+ if len(*n) > 0 {
+ return unsafe.Pointer(&((*n)[0]))
+ } else {
+ return unsafe.Pointer(nil)
+ }
+}
+
func bhApiIn1Out1(bhfunc C.FBHApiIn1Out1, data []byte, reply *bh.MsgCommonReply, timeout_ms int) bool {
creply := unsafe.Pointer(nil)
creply_len := C.int(0)
defer C.BHFree(creply, creply_len)
- r := C.BHApiIn1Out1Proxy(bhfunc, unsafe.Pointer(&data[0]), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0
+ r := C.BHApiIn1Out1Proxy(bhfunc, getPtr(&data), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0
if r {
reply.Unmarshal(C.GoBytes(creply, creply_len))
}
return r
-
}
func Register(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool {
@@ -50,7 +57,7 @@
func Publish(pub *bh.MsgPublish, timeout_ms int) bool {
data, _ := pub.Marshal()
- return C.BHPublish(unsafe.Pointer(&data[0]), C.int(len(data)), C.int(timeout_ms)) > 0
+ return C.BHPublish(getPtr(&data), C.int(len(data)), C.int(timeout_ms)) > 0
}
func ReadSub(proc_id *string, pub *bh.MsgPublish, timeout_ms int) bool {
@@ -67,19 +74,21 @@
return r
}
-func AsyncRequest(req *bh.MsgRequestTopic, msg_id *[]byte) bool {
+func AsyncRequest(dest_addr *bh.BHAddress, req *bh.MsgRequestTopic, msg_id *[]byte) bool {
+ dest, _ := dest_addr.Marshal()
data, _ := req.Marshal()
creply := unsafe.Pointer(nil)
creply_len := C.int(0)
defer C.BHFree(creply, creply_len)
- r := C.BHAsyncRequest(unsafe.Pointer(&data[0]), C.int(len(data)), &creply, &creply_len) > 0
+ r := C.BHAsyncRequest(getPtr(&dest), C.int(len(dest)), getPtr(&data), C.int(len(data)), &creply, &creply_len) > 0
if r {
*msg_id = C.GoBytes(creply, creply_len)
}
return r
}
-func Request(req *bh.MsgRequestTopic, proc_id *string, reply *bh.MsgRequestTopicReply, timeout_ms int) bool {
+func Request(dest_addr *bh.BHAddress, req *bh.MsgRequestTopic, proc_id *string, reply *bh.MsgRequestTopicReply, timeout_ms int) bool {
+ dest, _ := dest_addr.Marshal()
data, _ := req.Marshal()
cpid := unsafe.Pointer(nil)
cpid_len := C.int(0)
@@ -87,7 +96,7 @@
creply := unsafe.Pointer(nil)
creply_len := C.int(0)
defer C.BHFree(creply, creply_len)
- r := C.BHRequest(unsafe.Pointer(&data[0]), C.int(len(data)), &cpid, &cpid_len, &creply, &creply_len, C.int(timeout_ms)) > 0
+ r := C.BHRequest(getPtr(&dest), C.int(len(dest)), getPtr(&data), C.int(len(data)), &cpid, &cpid_len, &creply, &creply_len, C.int(timeout_ms)) > 0
if r {
*proc_id = string(C.GoBytes(cpid, cpid_len))
reply.Unmarshal(C.GoBytes(creply, creply_len))
@@ -112,7 +121,7 @@
func SendReply(src unsafe.Pointer, rep *bh.MsgRequestTopicReply) bool {
data, _ := rep.Marshal()
- return C.BHSendReply(src, unsafe.Pointer(&data[0]), C.int(len(data))) > 0
+ return C.BHSendReply(src, getPtr(&data), C.int(len(data))) > 0
}
func GetLastError() (int, string) {
diff --git a/api/bhsgo/bhome_node_test.go b/api/bhsgo/bhome_node_test.go
index 3629d89..9f2b324 100644
--- a/api/bhsgo/bhome_node_test.go
+++ b/api/bhsgo/bhome_node_test.go
@@ -64,8 +64,9 @@
pid := ""
rr := bh.MsgRequestTopicReply{}
+ dest := bh.BHAddress{}
for i := 0; i < 10000; i++ {
- if Request(&req, &pid, &rr, 3000) {
+ if Request(&dest, &req, &pid, &rr, 3000) {
fmt.Println("server:" + pid + ", reply:" + string(rr.Data))
} else {
e, s := GetLastError()
diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index cdf2e96..8a4b947 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -157,19 +157,23 @@
return false;
}
-int BHAsyncRequest(const void *request,
+int BHAsyncRequest(const void *remote,
+ const int remote_len,
+ const void *request,
const int request_len,
void **msg_id,
int *msg_id_len)
{
+ BHAddress dest;
MsgRequestTopic req;
- if (!req.ParseFromArray(request, request_len)) {
+ if (!dest.ParseFromArray(remote, remote_len) ||
+ !req.ParseFromArray(request, request_len)) {
SetLastError(eInvalidInput, "invalid input.");
return false;
}
std::string str_msg_id;
MsgRequestTopicReply out_msg;
- if (ProcNode().ClientAsyncRequest(req, str_msg_id)) {
+ if (ProcNode().ClientAsyncRequest(dest, req, str_msg_id)) {
if (!msg_id || !msg_id_len) {
return true;
}
@@ -184,7 +188,9 @@
return false;
}
-int BHRequest(const void *request,
+int BHRequest(const void *remote,
+ const int remote_len,
+ const void *request,
const int request_len,
void **proc_id,
int *proc_id_len,
@@ -192,14 +198,16 @@
int *reply_len,
const int timeout_ms)
{
+ BHAddress dest;
MsgRequestTopic req;
- if (!req.ParseFromArray(request, request_len)) {
+ if (!dest.ParseFromArray(remote, remote_len) ||
+ !req.ParseFromArray(request, request_len)) {
SetLastError(eInvalidInput, "invalid input.");
return false;
}
std::string proc;
MsgRequestTopicReply out_msg;
- if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) {
+ if (ProcNode().ClientSyncRequest(dest, req, proc, out_msg, timeout_ms)) {
TmpPtr pproc(proc);
if (pproc && PackOutput(out_msg, reply, reply_len)) {
pproc.ReleaseTo(proc_id, proc_id_len);
diff --git a/src/bh_api.h b/src/bh_api.h
index 5c64ae7..33a70cb 100644
--- a/src/bh_api.h
+++ b/src/bh_api.h
@@ -73,12 +73,16 @@
int *msgpub_len,
const int timeout_ms);
-int BHAsyncRequest(const void *request,
+int BHAsyncRequest(const void *remote,
+ const int remote_len,
+ const void *request,
const int request_len,
void **msg_id,
int *msg_id_len);
-int BHRequest(const void *request,
+int BHRequest(const void *remote,
+ const int remote_len,
+ const void *request,
const int request_len,
void **proc_id,
int *proc_id_len,
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 8bf9cf8..4f0c96f 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -283,7 +283,7 @@
return SockClient().Start(onData, nworker);
}
-bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb)
+bool TopicNode::ClientAsyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb)
{
if (!IsOnline()) {
SetLastError(eNotRegistered, "Not Registered.");
@@ -349,7 +349,7 @@
}
}
-bool TopicNode::ClientSyncRequest(const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms)
+bool TopicNode::ClientSyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms)
{
if (!IsOnline()) {
SetLastError(eNotRegistered, "Not Registered.");
diff --git a/src/topic_node.h b/src/topic_node.h
index b2fae5b..3371c35 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -56,8 +56,8 @@
// topic client
typedef std::function<void(const BHMsgHead &head, const MsgRequestTopicReply &reply)> RequestResultCB;
bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2);
- bool ClientAsyncRequest(const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB());
- bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms);
+ bool ClientAsyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB());
+ bool ClientSyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms);
// publish
bool Publish(const MsgPublish &pub, const int timeout_ms);
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 766c0f8..7981a2c 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -223,7 +223,8 @@
void *msg_id = 0;
int len = 0;
// Sleep(10ms, false);
- bool r = BHAsyncRequest(s.data(), s.size(), 0, 0);
+ std::string dest(BHAddress().SerializeAsString());
+ bool r = BHAsyncRequest(dest.data(), dest.size(), s.data(), s.size(), 0, 0);
DEFER1(BHFree(msg_id, len););
if (r) {
++Status().nrequest_;
diff --git a/utest/utest.cpp b/utest/utest.cpp
index b2de97f..fae22b1 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -233,7 +233,7 @@
boost::timer::auto_cpu_timer timer;
for (int i = 0; i < nreq; ++i) {
std::string msg_id;
- if (!client.ClientAsyncRequest(req, msg_id)) {
+ if (!client.ClientAsyncRequest(BHAddress(), req, msg_id)) {
printf("client request failed\n");
++count;
}
--
Gitblit v1.8.0