From f6535ea2ae09b3cdca9104fa19dfff39a47271ea Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 21 四月 2021 18:40:47 +0800 Subject: [PATCH] change api, add request remote address. --- utest/api_test.cpp | 3 + api/bhsgo/bhome_node_test.go | 3 + api/bhsgo/bhome_node.go | 25 ++++++++---- src/topic_node.h | 4 +- src/bh_api.h | 8 +++- utest/utest.cpp | 2 src/bh_api.cpp | 20 +++++++--- src/topic_node.cpp | 4 +- 8 files changed, 46 insertions(+), 23 deletions(-) diff --git a/api/bhsgo/bhome_node.go b/api/bhsgo/bhome_node.go index c5d4019..7e7bf17 100644 --- a/api/bhsgo/bhome_node.go +++ b/api/bhsgo/bhome_node.go @@ -12,16 +12,23 @@ "unsafe" ) +func getPtr(n *[]byte) unsafe.Pointer { + if len(*n) > 0 { + return unsafe.Pointer(&((*n)[0])) + } else { + return unsafe.Pointer(nil) + } +} + func bhApiIn1Out1(bhfunc C.FBHApiIn1Out1, data []byte, reply *bh.MsgCommonReply, timeout_ms int) bool { creply := unsafe.Pointer(nil) creply_len := C.int(0) defer C.BHFree(creply, creply_len) - r := C.BHApiIn1Out1Proxy(bhfunc, unsafe.Pointer(&data[0]), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0 + r := C.BHApiIn1Out1Proxy(bhfunc, getPtr(&data), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0 if r { reply.Unmarshal(C.GoBytes(creply, creply_len)) } return r - } func Register(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool { @@ -50,7 +57,7 @@ func Publish(pub *bh.MsgPublish, timeout_ms int) bool { data, _ := pub.Marshal() - return C.BHPublish(unsafe.Pointer(&data[0]), C.int(len(data)), C.int(timeout_ms)) > 0 + return C.BHPublish(getPtr(&data), C.int(len(data)), C.int(timeout_ms)) > 0 } func ReadSub(proc_id *string, pub *bh.MsgPublish, timeout_ms int) bool { @@ -67,19 +74,21 @@ return r } -func AsyncRequest(req *bh.MsgRequestTopic, msg_id *[]byte) bool { +func AsyncRequest(dest_addr *bh.BHAddress, req *bh.MsgRequestTopic, msg_id *[]byte) bool { + dest, _ := dest_addr.Marshal() data, _ := req.Marshal() creply := unsafe.Pointer(nil) creply_len := C.int(0) defer C.BHFree(creply, creply_len) - r := C.BHAsyncRequest(unsafe.Pointer(&data[0]), C.int(len(data)), &creply, &creply_len) > 0 + r := C.BHAsyncRequest(getPtr(&dest), C.int(len(dest)), getPtr(&data), C.int(len(data)), &creply, &creply_len) > 0 if r { *msg_id = C.GoBytes(creply, creply_len) } return r } -func Request(req *bh.MsgRequestTopic, proc_id *string, reply *bh.MsgRequestTopicReply, timeout_ms int) bool { +func Request(dest_addr *bh.BHAddress, req *bh.MsgRequestTopic, proc_id *string, reply *bh.MsgRequestTopicReply, timeout_ms int) bool { + dest, _ := dest_addr.Marshal() data, _ := req.Marshal() cpid := unsafe.Pointer(nil) cpid_len := C.int(0) @@ -87,7 +96,7 @@ creply := unsafe.Pointer(nil) creply_len := C.int(0) defer C.BHFree(creply, creply_len) - r := C.BHRequest(unsafe.Pointer(&data[0]), C.int(len(data)), &cpid, &cpid_len, &creply, &creply_len, C.int(timeout_ms)) > 0 + r := C.BHRequest(getPtr(&dest), C.int(len(dest)), getPtr(&data), C.int(len(data)), &cpid, &cpid_len, &creply, &creply_len, C.int(timeout_ms)) > 0 if r { *proc_id = string(C.GoBytes(cpid, cpid_len)) reply.Unmarshal(C.GoBytes(creply, creply_len)) @@ -112,7 +121,7 @@ func SendReply(src unsafe.Pointer, rep *bh.MsgRequestTopicReply) bool { data, _ := rep.Marshal() - return C.BHSendReply(src, unsafe.Pointer(&data[0]), C.int(len(data))) > 0 + return C.BHSendReply(src, getPtr(&data), C.int(len(data))) > 0 } func GetLastError() (int, string) { diff --git a/api/bhsgo/bhome_node_test.go b/api/bhsgo/bhome_node_test.go index 3629d89..9f2b324 100644 --- a/api/bhsgo/bhome_node_test.go +++ b/api/bhsgo/bhome_node_test.go @@ -64,8 +64,9 @@ pid := "" rr := bh.MsgRequestTopicReply{} + dest := bh.BHAddress{} for i := 0; i < 10000; i++ { - if Request(&req, &pid, &rr, 3000) { + if Request(&dest, &req, &pid, &rr, 3000) { fmt.Println("server:" + pid + ", reply:" + string(rr.Data)) } else { e, s := GetLastError() diff --git a/src/bh_api.cpp b/src/bh_api.cpp index cdf2e96..8a4b947 100644 --- a/src/bh_api.cpp +++ b/src/bh_api.cpp @@ -157,19 +157,23 @@ return false; } -int BHAsyncRequest(const void *request, +int BHAsyncRequest(const void *remote, + const int remote_len, + const void *request, const int request_len, void **msg_id, int *msg_id_len) { + BHAddress dest; MsgRequestTopic req; - if (!req.ParseFromArray(request, request_len)) { + if (!dest.ParseFromArray(remote, remote_len) || + !req.ParseFromArray(request, request_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } std::string str_msg_id; MsgRequestTopicReply out_msg; - if (ProcNode().ClientAsyncRequest(req, str_msg_id)) { + if (ProcNode().ClientAsyncRequest(dest, req, str_msg_id)) { if (!msg_id || !msg_id_len) { return true; } @@ -184,7 +188,9 @@ return false; } -int BHRequest(const void *request, +int BHRequest(const void *remote, + const int remote_len, + const void *request, const int request_len, void **proc_id, int *proc_id_len, @@ -192,14 +198,16 @@ int *reply_len, const int timeout_ms) { + BHAddress dest; MsgRequestTopic req; - if (!req.ParseFromArray(request, request_len)) { + if (!dest.ParseFromArray(remote, remote_len) || + !req.ParseFromArray(request, request_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } std::string proc; MsgRequestTopicReply out_msg; - if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) { + if (ProcNode().ClientSyncRequest(dest, req, proc, out_msg, timeout_ms)) { TmpPtr pproc(proc); if (pproc && PackOutput(out_msg, reply, reply_len)) { pproc.ReleaseTo(proc_id, proc_id_len); diff --git a/src/bh_api.h b/src/bh_api.h index 5c64ae7..33a70cb 100644 --- a/src/bh_api.h +++ b/src/bh_api.h @@ -73,12 +73,16 @@ int *msgpub_len, const int timeout_ms); -int BHAsyncRequest(const void *request, +int BHAsyncRequest(const void *remote, + const int remote_len, + const void *request, const int request_len, void **msg_id, int *msg_id_len); -int BHRequest(const void *request, +int BHRequest(const void *remote, + const int remote_len, + const void *request, const int request_len, void **proc_id, int *proc_id_len, diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 8bf9cf8..4f0c96f 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -283,7 +283,7 @@ return SockClient().Start(onData, nworker); } -bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb) +bool TopicNode::ClientAsyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb) { if (!IsOnline()) { SetLastError(eNotRegistered, "Not Registered."); @@ -349,7 +349,7 @@ } } -bool TopicNode::ClientSyncRequest(const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms) +bool TopicNode::ClientSyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms) { if (!IsOnline()) { SetLastError(eNotRegistered, "Not Registered."); diff --git a/src/topic_node.h b/src/topic_node.h index b2fae5b..3371c35 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -56,8 +56,8 @@ // topic client typedef std::function<void(const BHMsgHead &head, const MsgRequestTopicReply &reply)> RequestResultCB; bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2); - bool ClientAsyncRequest(const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB()); - bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms); + bool ClientAsyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB()); + bool ClientSyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms); // publish bool Publish(const MsgPublish &pub, const int timeout_ms); diff --git a/utest/api_test.cpp b/utest/api_test.cpp index 766c0f8..7981a2c 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -223,7 +223,8 @@ void *msg_id = 0; int len = 0; // Sleep(10ms, false); - bool r = BHAsyncRequest(s.data(), s.size(), 0, 0); + std::string dest(BHAddress().SerializeAsString()); + bool r = BHAsyncRequest(dest.data(), dest.size(), s.data(), s.size(), 0, 0); DEFER1(BHFree(msg_id, len);); if (r) { ++Status().nrequest_; diff --git a/utest/utest.cpp b/utest/utest.cpp index b2de97f..fae22b1 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -233,7 +233,7 @@ boost::timer::auto_cpu_timer timer; for (int i = 0; i < nreq; ++i) { std::string msg_id; - if (!client.ClientAsyncRequest(req, msg_id)) { + if (!client.ClientAsyncRequest(BHAddress(), req, msg_id)) { printf("client request failed\n"); ++count; } -- Gitblit v1.8.0