From ca319178f45ce6256aed7913565d445571f6db22 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 20 四月 2021 11:04:07 +0800 Subject: [PATCH] add go api, wrap C api, not finished. --- api/go/bhome_node.go | 214 +++++++++++++++++++++ api/go/go.mod | 5 src/socket.h | 6 .gitignore | 3 box/center.cpp | 11 src/proto.h | 12 src/msg.h | 1 src/socket.cpp | 2 proto/source/bhome_msg.proto | 40 ++- utest/utest.cpp | 16 proto/source/error_msg.proto | 3 src/topic_node.cpp | 2 src/defs.cpp | 8 proto/source/bhome_msg_api.proto | 3 utest/api_test.cpp | 19 - box/status_main.cc | 4 api/go/bh_api.h | 1 src/topic_node.h | 4 src/bh_api.h | 119 ++++++----- api/go/go.sum | 31 +++ src/bh_api.cpp | 92 ++++---- 21 files changed, 431 insertions(+), 165 deletions(-) diff --git a/.gitignore b/.gitignore index 6d80560..4e2c214 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,5 @@ */bhshmq_center */help */bhshmqbox -*/bhshmq_status \ No newline at end of file +*/bhshmq_status +api/go/bhome_msg/*.pb.go diff --git a/api/go/bh_api.h b/api/go/bh_api.h new file mode 120000 index 0000000..627e3b4 --- /dev/null +++ b/api/go/bh_api.h @@ -0,0 +1 @@ +../../src/bh_api.h \ No newline at end of file diff --git a/api/go/bhome_node.go b/api/go/bhome_node.go new file mode 100644 index 0000000..c950750 --- /dev/null +++ b/api/go/bhome_node.go @@ -0,0 +1,214 @@ +package main + +/* +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <stdint.h> +#include "bh_api.h" + +void print(int v) +{ + printf("print %d\n", v); +} + +static void ReadData(void **p, int32_t *n) +{ + *n = 4; + *p = malloc(4); + memcpy(*p, "abc", 4); + *n = 4; +} +static void PrintData(void *p, int32_t n) +{ + printf("data :%s\n", (char*)p); + free(p); +} + +*/ +// #cgo LDFLAGS: -L/home/lichao/code/shmsg/build/lib -L/usr/local/lib -lbhome_shmq -lbhome_msg -lprotobuf-lite -lstdc++ -lpthread -lrt +import "C" + +import ( + bh "bhome/bhome_msg" + "fmt" + "unsafe" +) + +func BHApiIn1Out1(bhfunc C.FBHApiIn1Out1, data []byte, reply *bh.MsgCommonReply, timeout_ms int) bool { + creply := unsafe.Pointer(nil) + creply_len := C.int(0) + defer C.BHFree(creply, creply_len) + r := C.BHApiIn1Out1Proxy(bhfunc, unsafe.Pointer(&data[0]), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0 + if r { + reply.Unmarshal(C.GoBytes(creply, creply_len)) + } + return r + +} + +func Register(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool { + data, _ := proc.Marshal() + return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHRegister), data, reply, timeout_ms) +} + +func RegisterTopics(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool { + data, _ := topics.Marshal() + return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHRegisterTopics), data, reply, timeout_ms) +} + +func Subscribe(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool { + data, _ := topics.Marshal() + return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeTopics), data, reply, timeout_ms) +} + +func Heartbeat(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool { + data, _ := topics.Marshal() + return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHHeartbeat), data, reply, timeout_ms) +} + +func HeartbeatEasy(timeout_ms int) bool { + return C.BHHeartbeatEasy(C.int(timeout_ms)) > 0 +} + +func Publish(pub *bh.MsgPublish, timeout_ms int) bool { + data, _ := pub.Marshal() + return C.BHPublish(unsafe.Pointer(&data[0]), C.int(len(data)), C.int(timeout_ms)) > 0 +} + +func ReadSub(proc_id *string, pub *bh.MsgPublish, timeout_ms int) bool { + cpid := unsafe.Pointer(nil) + cpid_len := C.int(0) + defer C.BHFree(cpid, cpid_len) + creply := unsafe.Pointer(nil) + creply_len := C.int(0) + defer C.BHFree(creply, creply_len) + + r := C.BHReadSub(&cpid, &cpid_len, &creply, &creply_len, C.int(timeout_ms)) > 0 + *proc_id = string(C.GoBytes(cpid, cpid_len)) + pub.Unmarshal(C.GoBytes(creply, creply_len)) + return r +} + +func AsyncRequest(req *bh.MsgRequestTopic, msg_id *[]byte) bool { + data, _ := req.Marshal() + creply := unsafe.Pointer(nil) + creply_len := C.int(0) + defer C.BHFree(creply, creply_len) + r := C.BHAsyncRequest(unsafe.Pointer(&data[0]), C.int(len(data)), &creply, &creply_len) > 0 + if r { + *msg_id = C.GoBytes(creply, creply_len) + } + return r +} + +func Request(req *bh.MsgRequestTopic, proc_id *string, reply *bh.MsgRequestTopicReply, timeout_ms int) bool { + data, _ := req.Marshal() + cpid := unsafe.Pointer(nil) + cpid_len := C.int(0) + defer C.BHFree(cpid, cpid_len) + creply := unsafe.Pointer(nil) + creply_len := C.int(0) + defer C.BHFree(creply, creply_len) + r := C.BHRequest(unsafe.Pointer(&data[0]), C.int(len(data)), &cpid, &cpid_len, &creply, &creply_len, C.int(timeout_ms)) > 0 + if r { + *proc_id = string(C.GoBytes(cpid, cpid_len)) + reply.Unmarshal(C.GoBytes(creply, creply_len)) + } + return r +} + +func ReadRequest(proc_id *string, req *bh.MsgRequestTopic, psrc *unsafe.Pointer, timeout_ms int) bool { + cpid := unsafe.Pointer(nil) + cpid_len := C.int(0) + defer C.BHFree(cpid, cpid_len) + creply := unsafe.Pointer(nil) + creply_len := C.int(0) + defer C.BHFree(creply, creply_len) + r := C.BHReadRequest(&cpid, &cpid_len, &creply, &creply_len, psrc, C.int(timeout_ms)) > 0 + if r { + *proc_id = string(C.GoBytes(cpid, cpid_len)) + req.Unmarshal(C.GoBytes(creply, creply_len)) + } + return r +} + +func SendReply(src unsafe.Pointer, rep *bh.MsgRequestTopicReply) bool { + data, _ := rep.Marshal() + return C.BHSendReply(src, unsafe.Pointer(&data[0]), C.int(len(data))) > 0 +} + +func GetLastError() (int, string) { + creply := unsafe.Pointer(nil) + creply_len := C.int(0) + defer C.BHFree(creply, creply_len) + r := C.BHGetLastError(&creply, &creply_len) + return int(r), string(C.GoBytes(creply, creply_len)) + +} + +func ServerCallbackReply(tag unsafe.Pointer, rep *bh.MsgRequestTopicReply) bool { + data, _ := rep.Marshal() + return C.BHServerCallbackReply(tag, unsafe.Pointer(&data[0]), C.int(len(data))) > 0 +} + +type ServecCB func(proc_id *string, req *bh.MsgRequestTopic, reply *bh.MsgRequestTopicReply) bool +type SubDataCB func(proc_id *string, pub *bh.MsgPublish) +type ClientCB func(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply) + +func cserver_callback(cpid *unsafe.Pointer, cpid_len unsafe.Pointer) { + +} +func StartWorker(server_cb ServecCB, sub_cb SubDataCB, client_cb ClientCB) { + +} + +///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// user code: + +func ServerCallback(proc_id *string, req *bh.MsgRequestTopic, reply *bh.MsgRequestTopicReply) bool { + // xxxx + return true +} + +func SubDataCallback(proc_id *string, pub *bh.MsgPublish) { + +} +func ClientCallback(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply) { + +} + +func main() { + proc := bh.ProcInfo{} + proc.ProcId = []byte("test_proc") + reply := bh.MsgCommonReply{} + + r := Register(&proc, &reply, 1000) + if r { + fmt.Println("register ok") + } else { + fmt.Println("register failed") + return + } + + r = HeartbeatEasy(1000) + if r { + fmt.Println("heartbeat ok") + } else { + fmt.Println("heartbeat failed") + } + + topics := bh.MsgTopicList{} + topics.TopicList = append(topics.TopicList, []byte("topic0"), []byte("topic1")) + RegisterTopics(&topics, &reply, 0) + if r { + fmt.Println("reg topics ok") + } else { + fmt.Println("reg topics failed") + } + + p := unsafe.Pointer(nil) + n := C.int32_t(0) + C.ReadData(&p, &n) + C.PrintData(p, n) +} diff --git a/api/go/go.mod b/api/go/go.mod new file mode 100644 index 0000000..7a12c30 --- /dev/null +++ b/api/go/go.mod @@ -0,0 +1,5 @@ +module bhome + +go 1.16 + +require github.com/gogo/protobuf v1.3.2 // indirect diff --git a/api/go/go.sum b/api/go/go.sum new file mode 100644 index 0000000..faf43b7 --- /dev/null +++ b/api/go/go.sum @@ -0,0 +1,31 @@ +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/box/center.cpp b/box/center.cpp index e8d014e..3059e90 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -27,7 +27,6 @@ using namespace bhome_shm; using namespace bhome_msg; -using namespace bhome::msg; typedef BHCenter::MsgHandler Handler; namespace @@ -39,7 +38,7 @@ public: typedef std::string ProcId; typedef std::string Address; - typedef bhome::msg::ProcInfo ProcInfo; + typedef bhome_msg::ProcInfo ProcInfo; typedef std::function<void(Address const &)> Cleaner; private: @@ -396,7 +395,7 @@ Handler Combine(const Handler &h1, const Handler &h2) { - return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head) { + return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head) { return h1(socket, msg, head) || h2(socket, msg, head); }; } @@ -490,12 +489,6 @@ #undef CASE_ON_MSG_TYPE } // namespace - -SharedMemory &BHomeShm() -{ - static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512); - return shm; -} BHCenter::CenterRecords &BHCenter::Centers() { diff --git a/box/status_main.cc b/box/status_main.cc index 993fbee..3f075fb 100644 --- a/box/status_main.cc +++ b/box/status_main.cc @@ -41,7 +41,7 @@ auto showStatus = [&]() { auto next = Now(); const uint64_t start = ToMs(next); - auto last = 0; + auto last = 0ul; while (run) { std::this_thread::sleep_until(next); auto passed = ToMs(next) - start; @@ -70,7 +70,7 @@ char buf[200] = "\n"; auto Print = [&](bool new_line) { - int n = sprintf(buf, "\r%6ds avail : %12ld = %s %6ds", sec, cur, Pretty(cur).c_str() + 1, sec); + int n = sprintf(buf, "\r%6lds avail : %12ld = %s %6lds", sec, cur, Pretty(cur).c_str() + 1, sec); printf("%s", buf); if (new_line) { auto diff = cur - last; diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto index 11ff5a2..aabe372 100644 --- a/proto/source/bhome_msg.proto +++ b/proto/source/bhome_msg.proto @@ -1,11 +1,12 @@ syntax = "proto3"; option optimize_for = LITE_RUNTIME; +option go_package="./bhome_msg"; // import "google/protobuf/descriptor.proto"; import "bhome_msg_api.proto"; import "error_msg.proto"; -package bhome.msg; +package bhome_msg; // message format : head_len(4) + head(BHMsgHead) + body_len(4) + body(variable types) @@ -19,22 +20,6 @@ bytes topic = 6; // for request route } -message MsgRequest { - MsgType type = 1; - // oneof body; -} - -message MsgReply { - ErrorMsg err_msg = 1; - // oneof reply -} - -message BHMsgBody { - oneof reqrep { - MsgRequest request = 1; - MsgReply reply = 2; - } -} enum MsgType { kMsgTypeInvalid = 0; @@ -76,3 +61,24 @@ rpc Query (MsgQueryTopic) returns (MsgQueryTopicReply); rpc Request (MsgRequestTopic) returns (MsgQueryTopicReply); } + +message MsgRequest { + // oneof body; + oneof request { + MsgRegister register = 1; + MsgRequestTopic topic_request = 2; + MsgQueryTopic topic_query = 3; + } +} + +message MsgReply { + ErrorMsg err_msg = 1; + // oneof reply +} + +message BHMsgBody { + oneof reqrep { + MsgRequest request = 1; + MsgReply reply = 2; + } +} diff --git a/proto/source/bhome_msg_api.proto b/proto/source/bhome_msg_api.proto index a8e8545..1c7cc1c 100644 --- a/proto/source/bhome_msg_api.proto +++ b/proto/source/bhome_msg_api.proto @@ -1,10 +1,11 @@ syntax = "proto3"; option optimize_for = LITE_RUNTIME; +option go_package="./bhome_msg"; // public messages import "error_msg.proto"; -package bhome.msg; +package bhome_msg; message BHAddress { bytes mq_id = 1; // mqid, uuid diff --git a/proto/source/error_msg.proto b/proto/source/error_msg.proto index b85ddb3..6496c67 100644 --- a/proto/source/error_msg.proto +++ b/proto/source/error_msg.proto @@ -1,8 +1,9 @@ syntax = "proto3"; option optimize_for = LITE_RUNTIME; +option go_package="./bhome_msg"; -package bhome.msg; +package bhome_msg; enum ErrorCode { eSuccess = 0; diff --git a/src/bh_api.cpp b/src/bh_api.cpp index 2abe66d..3844000 100644 --- a/src/bh_api.cpp +++ b/src/bh_api.cpp @@ -83,44 +83,50 @@ return false; } MsgOut msg_reply; - if ((ProcNode().*mfunc)(input, msg_reply, timeout_ms)) { - return PackOutput(msg_reply, reply, reply_len); - - } else { - return false; - } + return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) && + PackOutput(msg_reply, reply, reply_len); } } // namespace -bool BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) +int BHApiIn1Out1Proxy(FBHApiIn1Out1 func, + const void *request, + const int request_len, + void **reply, + int *reply_len, + const int timeout_ms) +{ + return (*func)(request, request_len, reply, reply_len, timeout_ms); +} + +int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms); } -bool BHHeartBeatEasy(const int timeout_ms) +int BHHeartbeatEasy(const int timeout_ms) { return ProcNode().Heartbeat(timeout_ms); } -bool BHHeartBeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) +int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms); } -bool BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) +int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms); } -bool BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) +int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); } -bool BHPublish(const void *msgpub, - const int msgpub_len, - const int timeout_ms) +int BHPublish(const void *msgpub, + const int msgpub_len, + const int timeout_ms) { MsgPublish pub; if (!pub.ParseFromArray(msgpub, msgpub_len)) { @@ -130,11 +136,11 @@ return ProcNode().Publish(pub, timeout_ms); } -bool BHReadSub(void **proc_id, - int *proc_id_len, - void **msgpub, - int *msgpub_len, - const int timeout_ms) +int BHReadSub(void **proc_id, + int *proc_id_len, + void **msgpub, + int *msgpub_len, + const int timeout_ms) { std::string proc; MsgPublish pub; @@ -151,10 +157,10 @@ return false; } -bool BHAsyncRequest(const void *request, - const int request_len, - void **msg_id, - int *msg_id_len) +int BHAsyncRequest(const void *request, + const int request_len, + void **msg_id, + int *msg_id_len) { MsgRequestTopic req; if (!req.ParseFromArray(request, request_len)) { @@ -178,13 +184,13 @@ return false; } -bool BHRequest(const void *request, - const int request_len, - void **proc_id, - int *proc_id_len, - void **reply, - int *reply_len, - const int timeout_ms) +int BHRequest(const void *request, + const int request_len, + void **proc_id, + int *proc_id_len, + void **reply, + int *reply_len, + const int timeout_ms) { MsgRequestTopic req; if (!req.ParseFromArray(request, request_len)) { @@ -205,12 +211,12 @@ return false; } -bool BHReadRequest(void **proc_id, - int *proc_id_len, - void **request, - int *request_len, - void **src, - const int timeout_ms) +int BHReadRequest(void **proc_id, + int *proc_id_len, + void **request, + int *request_len, + void **src, + const int timeout_ms) { void *src_info = 0; std::string proc; @@ -228,9 +234,9 @@ return false; } -bool BHSendReply(void *src, - const void *reply, - const int reply_len) +int BHSendReply(void *src, + const void *reply, + const int reply_len) { MsgRequestTopicReply rep; if (!rep.ParseFromArray(reply, reply_len)) { @@ -263,7 +269,7 @@ r = reply.ParseFromArray(p, len); return r; }; - server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), (BHServerCallbackTag *) (&sender)); + server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), &sender); return r; }; } @@ -284,9 +290,9 @@ ProcNode().Start(on_req, on_sub, on_reply); } -bool BHServerCallbackReply(const BHServerCallbackTag *tag, - const void *data, - const int data_len) +int BHServerCallbackReply(const void *tag, + const void *data, + const int data_len) { auto &sender = *(const ServerSender *) (tag); return sender(data, data_len); diff --git a/src/bh_api.h b/src/bh_api.h index eeb47a5..39b4cc6 100644 --- a/src/bh_api.h +++ b/src/bh_api.h @@ -5,26 +5,36 @@ extern "C" { #endif -struct BHSrcInfo; -struct BHServerCallbackTag; +typedef int (*FBHApiIn1Out1)(const void *proc_info, + const int proc_info_len, + void **reply, + int *reply_len, + const int timeout_ms); -bool BHRegister(const void *proc_info, - const int proc_info_len, - void **reply, - int *reply_len, - const int timeout_ms); - -bool BHRegisterTopics(const void *topics, - const int topics_len, +int BHApiIn1Out1Proxy(FBHApiIn1Out1 func, + const void *request, + const int request_len, void **reply, int *reply_len, const int timeout_ms); -bool BHSubscribeTopics(const void *topics, - const int topics_len, - void **reply, - int *reply_len, - const int timeout_ms); +int BHRegister(const void *proc_info, + const int proc_info_len, + void **reply, + int *reply_len, + const int timeout_ms); + +int BHRegisterTopics(const void *topics, + const int topics_len, + void **reply, + int *reply_len, + const int timeout_ms); + +int BHSubscribeTopics(const void *topics, + const int topics_len, + void **reply, + int *reply_len, + const int timeout_ms); typedef void (*FSubDataCallback)(const void *proc_id, const int proc_id_len, @@ -35,7 +45,7 @@ const int proc_id_len, const void *data, const int data_len, - BHServerCallbackTag *tag); + const void *tag); typedef void (*FClientCallback)(const void *proc_id, const int proc_id_len, @@ -45,56 +55,57 @@ const int data_len); void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb); -bool BHServerCallbackReply(const BHServerCallbackTag *tag, - const void *data, - const int data_len); -bool BHHeartBeatEasy(const int timeout_ms); -bool BHHeartBeat(const void *proc_info, - const int proc_info_len, - void **reply, - int *reply_len, - const int timeout_ms); +int BHServerCallbackReply(const void *tag, + const void *data, + const int data_len); -bool BHPublish(const void *msgpub, - const int msgpub_len, - const int timeout_ms); +int BHHeartbeatEasy(const int timeout_ms); +int BHHeartbeat(const void *proc_info, + const int proc_info_len, + void **reply, + int *reply_len, + const int timeout_ms); -bool BHReadSub(const void *proc_id, - const int proc_id_len, - void **msgpub, - int *msgpub_len, - const int timeout_ms); +int BHPublish(const void *msgpub, + const int msgpub_len, + const int timeout_ms); -bool BHAsyncRequest(const void *request, - const int request_len, - void **msg_id, - int *msg_id_len); +int BHReadSub(void **proc_id, + int *proc_id_len, + void **msgpub, + int *msgpub_len, + const int timeout_ms); -bool BHRequest(const void *request, - const int request_len, - void **proc_id, - int *proc_id_len, - void **reply, - int *reply_len, - const int timeout_ms); +int BHAsyncRequest(const void *request, + const int request_len, + void **msg_id, + int *msg_id_len); -bool BHReadRequest(void **proc_id, - int *proc_id_len, - void **request, - int *request_len, - BHSrcInfo **src, - const int timeout_ms); +int BHRequest(const void *request, + const int request_len, + void **proc_id, + int *proc_id_len, + void **reply, + int *reply_len, + const int timeout_ms); -bool BHSendReply(BHSrcInfo *src, - const void *reply, - const int reply_len); +int BHReadRequest(void **proc_id, + int *proc_id_len, + void **request, + int *request_len, + void **src, + const int timeout_ms); + +int BHSendReply(void *src, + const void *reply, + const int reply_len); // int BHCleanUp(); void BHFree(void *buf, int size); -int BHGetLastError(void **msg, int &msg_len); +int BHGetLastError(void **msg, int *msg_len); #ifdef __cplusplus } diff --git a/src/defs.cpp b/src/defs.cpp index 77b0722..0ff671b 100644 --- a/src/defs.cpp +++ b/src/defs.cpp @@ -16,6 +16,8 @@ * ===================================================================================== */ #include "defs.h" +#include "shm.h" + namespace { @@ -40,6 +42,12 @@ const MQId &BHTopicCenterAddress() { return kBHTopicCenter; } const MQId &BHUniCenterAddress() { return kBHUniCenter; } +bhome_shm::SharedMemory &BHomeShm() +{ + static bhome_shm::SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512); + return shm; +} + void SetLastError(const int ec, const std::string &msg) { LastErrorStore().ec_ = ec; diff --git a/src/msg.h b/src/msg.h index c239956..e6b0b34 100644 --- a/src/msg.h +++ b/src/msg.h @@ -29,7 +29,6 @@ namespace bhome_msg { using namespace bhome_shm; -using namespace bhome::msg; // for serialized data in MsgI // MsgI is safe to be stored in shared memory, so POD data or offset_ptr is required. // message format: header(meta) + body(data). diff --git a/src/proto.h b/src/proto.h index b418342..2557f8e 100644 --- a/src/proto.h +++ b/src/proto.h @@ -22,16 +22,16 @@ #include "bhome_msg_api.pb.h" #include <chrono> -using namespace bhome::msg; +using namespace bhome_msg; template <class Msg> struct MsgToType { }; -#define BHOME_MAP_MSG_AND_TYPE(mSG, tYPE) \ - template <> \ - struct MsgToType<mSG> { \ - static const bhome::msg::MsgType value = tYPE; \ +#define BHOME_MAP_MSG_AND_TYPE(mSG, tYPE) \ + template <> \ + struct MsgToType<mSG> { \ + static const MsgType value = tYPE; \ }; #define BHOME_SIMPLE_MAP_MSG(name) BHOME_MAP_MSG_AND_TYPE(Msg##name, kMsgType##name) @@ -52,7 +52,7 @@ #undef BHOME_MAP_MSG_AND_TYPE template <class Msg> -constexpr inline bhome::msg::MsgType GetType(const Msg &) +constexpr inline MsgType GetType(const Msg &) { return MsgToType<Msg>::value; } diff --git a/src/socket.cpp b/src/socket.cpp index 0ba195a..aec42b4 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -115,7 +115,7 @@ } //maybe reimplment, using async cbs? -bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms) +bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms) { // std::lock_guard<std::mutex> lock(mutex_); // seems no need to lock mutex_. bool got = (timeout_ms == 0) ? mq().TryRecv(msg) : mq().Recv(msg, timeout_ms); diff --git a/src/socket.h b/src/socket.h index db64b36..493aeb4 100644 --- a/src/socket.h +++ b/src/socket.h @@ -66,7 +66,7 @@ size_t Pending() const { return mq().Pending(); } template <class Body> - bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb = RecvCB()) + bool Send(const void *valid_remote, BHMsgHead &head, Body &body, const RecvCB &cb = RecvCB()) { try { if (!cb) { @@ -91,10 +91,10 @@ return SendImpl(valid_remote, imsg); } - bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms); + bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms); template <class Body> - bool SendAndRecv(const void *remote, const BHMsgHead &head, const Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) + bool SendAndRecv(const void *remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) { struct State { std::mutex mutex; diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 2cc5483..9853f35 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -361,7 +361,7 @@ return false; } -bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms) +bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms) { if (!IsRegistered()) { SetLastError(eNotRegistered, "Not Registered."); diff --git a/src/topic_node.h b/src/topic_node.h index 5a3b86e..87ad770 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -69,10 +69,10 @@ void Stop(); private: - bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms); + bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms); const std::string &proc_id() { return info_.proc_id(); } - typedef bhome_msg::BHAddress Address; + typedef BHAddress Address; class TopicQueryCache { class Impl diff --git a/utest/api_test.cpp b/utest/api_test.cpp index da51044..b2c00a4 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -19,7 +19,7 @@ #include "util.h" #include <atomic> -using namespace bhome::msg; +using namespace bhome_msg; namespace { @@ -66,7 +66,7 @@ const int proc_id_len, const void *data, const int data_len, - BHServerCallbackTag *tag) + const void *tag) { // printf("ServerProc: "); // DEFER1(printf("\n");); @@ -138,19 +138,6 @@ } } -namespace -{ -struct CCC { -}; -void F(CCC &&c) {} - -template <class... T> -void Pass(T &&...t) -{ - F(std::forward<decltype(t)>(t)...); -} - -} // namespace BOOST_AUTO_TEST_CASE(ApiTest) { auto max_time = std::chrono::steady_clock::time_point::max(); @@ -266,7 +253,7 @@ auto hb = [](std::atomic<bool> *run) { while (*run) { Sleep(1s, false); - bool r = BHHeartBeatEasy(1000); + bool r = BHHeartbeatEasy(1000); printf("heartbeat: %s\n", r ? "ok" : "failed"); } }; diff --git a/utest/utest.cpp b/utest/utest.cpp index 12d4396..572d8e5 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -198,7 +198,7 @@ { const std::string shm_name("ShmReqRep"); ShmRemover auto_remove(shm_name); - SharedMemory shm(shm_name, 1024 * 1024 * 50); + SharedMemory shm(shm_name, 1024 * 1024 * 512); auto Avail = [&]() { return shm.get_free_memory(); }; auto init_avail = Avail(); @@ -224,22 +224,24 @@ printf("count: %d\n", count.load()); } }; + MsgRequestTopic req; + req.set_topic(topic); + req.set_data("data " + std::string(100, 'a')); client.ClientStartWorker(onRecv, 2); boost::timer::auto_cpu_timer timer; for (int i = 0; i < nreq; ++i) { - MsgRequestTopic req; - req.set_topic(topic); - req.set_data("data " + std::to_string(i)); std::string msg_id; if (!client.ClientAsyncRequest(req, msg_id)) { printf("client request failed\n"); ++count; } - // if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) { + // std::string proc_id; + // MsgRequestTopicReply reply; + // if (!client.ClientSyncRequest(req, proc_id, reply, 1000)) { // printf("client request failed\n"); // } - // ++count; + // ++count; } do { std::this_thread::yield(); @@ -278,7 +280,7 @@ servers.Launch(Server, "server", topics); Sleep(100ms); for (auto &t : topics) { - clients.Launch(Client, t, 1000 * 100); + clients.Launch(Client, t, 1000 * 100 * 2); } clients.WaitAll(); printf("clients done, server replyed: %ld\n", server_msg_count.load()); -- Gitblit v1.8.0