lichao
2021-04-21 f6535ea2ae09b3cdca9104fa19dfff39a47271ea
change api, add request remote address.
8个文件已修改
69 ■■■■■ 已修改文件
api/bhsgo/bhome_node.go 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/bhsgo/bhome_node_test.go 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cpp 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.h 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/bhsgo/bhome_node.go
@@ -12,16 +12,23 @@
    "unsafe"
)
func getPtr(n *[]byte) unsafe.Pointer {
    if len(*n) > 0 {
        return unsafe.Pointer(&((*n)[0]))
    } else {
        return unsafe.Pointer(nil)
    }
}
func bhApiIn1Out1(bhfunc C.FBHApiIn1Out1, data []byte, reply *bh.MsgCommonReply, timeout_ms int) bool {
    creply := unsafe.Pointer(nil)
    creply_len := C.int(0)
    defer C.BHFree(creply, creply_len)
    r := C.BHApiIn1Out1Proxy(bhfunc, unsafe.Pointer(&data[0]), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0
    r := C.BHApiIn1Out1Proxy(bhfunc, getPtr(&data), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0
    if r {
        reply.Unmarshal(C.GoBytes(creply, creply_len))
    }
    return r
}
func Register(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool {
@@ -50,7 +57,7 @@
func Publish(pub *bh.MsgPublish, timeout_ms int) bool {
    data, _ := pub.Marshal()
    return C.BHPublish(unsafe.Pointer(&data[0]), C.int(len(data)), C.int(timeout_ms)) > 0
    return C.BHPublish(getPtr(&data), C.int(len(data)), C.int(timeout_ms)) > 0
}
func ReadSub(proc_id *string, pub *bh.MsgPublish, timeout_ms int) bool {
@@ -67,19 +74,21 @@
    return r
}
func AsyncRequest(req *bh.MsgRequestTopic, msg_id *[]byte) bool {
func AsyncRequest(dest_addr *bh.BHAddress, req *bh.MsgRequestTopic, msg_id *[]byte) bool {
    dest, _ := dest_addr.Marshal()
    data, _ := req.Marshal()
    creply := unsafe.Pointer(nil)
    creply_len := C.int(0)
    defer C.BHFree(creply, creply_len)
    r := C.BHAsyncRequest(unsafe.Pointer(&data[0]), C.int(len(data)), &creply, &creply_len) > 0
    r := C.BHAsyncRequest(getPtr(&dest), C.int(len(dest)), getPtr(&data), C.int(len(data)), &creply, &creply_len) > 0
    if r {
        *msg_id = C.GoBytes(creply, creply_len)
    }
    return r
}
func Request(req *bh.MsgRequestTopic, proc_id *string, reply *bh.MsgRequestTopicReply, timeout_ms int) bool {
func Request(dest_addr *bh.BHAddress, req *bh.MsgRequestTopic, proc_id *string, reply *bh.MsgRequestTopicReply, timeout_ms int) bool {
    dest, _ := dest_addr.Marshal()
    data, _ := req.Marshal()
    cpid := unsafe.Pointer(nil)
    cpid_len := C.int(0)
@@ -87,7 +96,7 @@
    creply := unsafe.Pointer(nil)
    creply_len := C.int(0)
    defer C.BHFree(creply, creply_len)
    r := C.BHRequest(unsafe.Pointer(&data[0]), C.int(len(data)), &cpid, &cpid_len, &creply, &creply_len, C.int(timeout_ms)) > 0
    r := C.BHRequest(getPtr(&dest), C.int(len(dest)), getPtr(&data), C.int(len(data)), &cpid, &cpid_len, &creply, &creply_len, C.int(timeout_ms)) > 0
    if r {
        *proc_id = string(C.GoBytes(cpid, cpid_len))
        reply.Unmarshal(C.GoBytes(creply, creply_len))
@@ -112,7 +121,7 @@
func SendReply(src unsafe.Pointer, rep *bh.MsgRequestTopicReply) bool {
    data, _ := rep.Marshal()
    return C.BHSendReply(src, unsafe.Pointer(&data[0]), C.int(len(data))) > 0
    return C.BHSendReply(src, getPtr(&data), C.int(len(data))) > 0
}
func GetLastError() (int, string) {
api/bhsgo/bhome_node_test.go
@@ -64,8 +64,9 @@
    pid := ""
    rr := bh.MsgRequestTopicReply{}
    dest := bh.BHAddress{}
    for i := 0; i < 10000; i++ {
        if Request(&req, &pid, &rr, 3000) {
        if Request(&dest, &req, &pid, &rr, 3000) {
            fmt.Println("server:" + pid + ", reply:" + string(rr.Data))
        } else {
            e, s := GetLastError()
src/bh_api.cpp
@@ -157,19 +157,23 @@
    return false;
}
int BHAsyncRequest(const void *request,
int BHAsyncRequest(const void *remote,
                   const int remote_len,
                   const void *request,
                   const int request_len,
                   void **msg_id,
                   int *msg_id_len)
{
    BHAddress dest;
    MsgRequestTopic req;
    if (!req.ParseFromArray(request, request_len)) {
    if (!dest.ParseFromArray(remote, remote_len) ||
        !req.ParseFromArray(request, request_len)) {
        SetLastError(eInvalidInput, "invalid input.");
        return false;
    }
    std::string str_msg_id;
    MsgRequestTopicReply out_msg;
    if (ProcNode().ClientAsyncRequest(req, str_msg_id)) {
    if (ProcNode().ClientAsyncRequest(dest, req, str_msg_id)) {
        if (!msg_id || !msg_id_len) {
            return true;
        }
@@ -184,7 +188,9 @@
    return false;
}
int BHRequest(const void *request,
int BHRequest(const void *remote,
              const int remote_len,
              const void *request,
              const int request_len,
              void **proc_id,
              int *proc_id_len,
@@ -192,14 +198,16 @@
              int *reply_len,
              const int timeout_ms)
{
    BHAddress dest;
    MsgRequestTopic req;
    if (!req.ParseFromArray(request, request_len)) {
    if (!dest.ParseFromArray(remote, remote_len) ||
        !req.ParseFromArray(request, request_len)) {
        SetLastError(eInvalidInput, "invalid input.");
        return false;
    }
    std::string proc;
    MsgRequestTopicReply out_msg;
    if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) {
    if (ProcNode().ClientSyncRequest(dest, req, proc, out_msg, timeout_ms)) {
        TmpPtr pproc(proc);
        if (pproc && PackOutput(out_msg, reply, reply_len)) {
            pproc.ReleaseTo(proc_id, proc_id_len);
src/bh_api.h
@@ -73,12 +73,16 @@
              int *msgpub_len,
              const int timeout_ms);
int BHAsyncRequest(const void *request,
int BHAsyncRequest(const void *remote,
                   const int remote_len,
                   const void *request,
                   const int request_len,
                   void **msg_id,
                   int *msg_id_len);
int BHRequest(const void *request,
int BHRequest(const void *remote,
              const int remote_len,
              const void *request,
              const int request_len,
              void **proc_id,
              int *proc_id_len,
src/topic_node.cpp
@@ -283,7 +283,7 @@
    return SockClient().Start(onData, nworker);
}
bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb)
bool TopicNode::ClientAsyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb)
{
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
@@ -349,7 +349,7 @@
    }
}
bool TopicNode::ClientSyncRequest(const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms)
bool TopicNode::ClientSyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms)
{
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
src/topic_node.h
@@ -56,8 +56,8 @@
    // topic client
    typedef std::function<void(const BHMsgHead &head, const MsgRequestTopicReply &reply)> RequestResultCB;
    bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2);
    bool ClientAsyncRequest(const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB());
    bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms);
    bool ClientAsyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB());
    bool ClientSyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms);
    // publish
    bool Publish(const MsgPublish &pub, const int timeout_ms);
utest/api_test.cpp
@@ -223,7 +223,8 @@
            void *msg_id = 0;
            int len = 0;
            // Sleep(10ms, false);
            bool r = BHAsyncRequest(s.data(), s.size(), 0, 0);
            std::string dest(BHAddress().SerializeAsString());
            bool r = BHAsyncRequest(dest.data(), dest.size(), s.data(), s.size(), 0, 0);
            DEFER1(BHFree(msg_id, len););
            if (r) {
                ++Status().nrequest_;
utest/utest.cpp
@@ -233,7 +233,7 @@
        boost::timer::auto_cpu_timer timer;
        for (int i = 0; i < nreq; ++i) {
            std::string msg_id;
            if (!client.ClientAsyncRequest(req, msg_id)) {
            if (!client.ClientAsyncRequest(BHAddress(), req, msg_id)) {
                printf("client request failed\n");
                ++count;
            }