add go api, wrap C api, not finished.
| | |
| | | */bhshmq_center |
| | | */help |
| | | */bhshmqbox |
| | | */bhshmq_status |
| | | */bhshmq_status |
| | | api/go/bhome_msg/*.pb.go |
New file |
| | |
| | | ../../src/bh_api.h |
New file |
| | |
| | | package main |
| | | |
| | | /* |
| | | #include <stdio.h> |
| | | #include <stdlib.h> |
| | | #include <string.h> |
| | | #include <stdint.h> |
| | | #include "bh_api.h" |
| | | |
| | | void print(int v) |
| | | { |
| | | printf("print %d\n", v); |
| | | } |
| | | |
| | | static void ReadData(void **p, int32_t *n) |
| | | { |
| | | *n = 4; |
| | | *p = malloc(4); |
| | | memcpy(*p, "abc", 4); |
| | | *n = 4; |
| | | } |
| | | static void PrintData(void *p, int32_t n) |
| | | { |
| | | printf("data :%s\n", (char*)p); |
| | | free(p); |
| | | } |
| | | |
| | | */ |
| | | // #cgo LDFLAGS: -L/home/lichao/code/shmsg/build/lib -L/usr/local/lib -lbhome_shmq -lbhome_msg -lprotobuf-lite -lstdc++ -lpthread -lrt |
| | | import "C" |
| | | |
| | | import ( |
| | | bh "bhome/bhome_msg" |
| | | "fmt" |
| | | "unsafe" |
| | | ) |
| | | |
| | | func BHApiIn1Out1(bhfunc C.FBHApiIn1Out1, data []byte, reply *bh.MsgCommonReply, timeout_ms int) bool { |
| | | creply := unsafe.Pointer(nil) |
| | | creply_len := C.int(0) |
| | | defer C.BHFree(creply, creply_len) |
| | | r := C.BHApiIn1Out1Proxy(bhfunc, unsafe.Pointer(&data[0]), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0 |
| | | if r { |
| | | reply.Unmarshal(C.GoBytes(creply, creply_len)) |
| | | } |
| | | return r |
| | | |
| | | } |
| | | |
| | | func Register(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool { |
| | | data, _ := proc.Marshal() |
| | | return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHRegister), data, reply, timeout_ms) |
| | | } |
| | | |
| | | func RegisterTopics(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool { |
| | | data, _ := topics.Marshal() |
| | | return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHRegisterTopics), data, reply, timeout_ms) |
| | | } |
| | | |
| | | func Subscribe(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool { |
| | | data, _ := topics.Marshal() |
| | | return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeTopics), data, reply, timeout_ms) |
| | | } |
| | | |
| | | func Heartbeat(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool { |
| | | data, _ := topics.Marshal() |
| | | return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHHeartbeat), data, reply, timeout_ms) |
| | | } |
| | | |
| | | func HeartbeatEasy(timeout_ms int) bool { |
| | | return C.BHHeartbeatEasy(C.int(timeout_ms)) > 0 |
| | | } |
| | | |
| | | func Publish(pub *bh.MsgPublish, timeout_ms int) bool { |
| | | data, _ := pub.Marshal() |
| | | return C.BHPublish(unsafe.Pointer(&data[0]), C.int(len(data)), C.int(timeout_ms)) > 0 |
| | | } |
| | | |
| | | func ReadSub(proc_id *string, pub *bh.MsgPublish, timeout_ms int) bool { |
| | | cpid := unsafe.Pointer(nil) |
| | | cpid_len := C.int(0) |
| | | defer C.BHFree(cpid, cpid_len) |
| | | creply := unsafe.Pointer(nil) |
| | | creply_len := C.int(0) |
| | | defer C.BHFree(creply, creply_len) |
| | | |
| | | r := C.BHReadSub(&cpid, &cpid_len, &creply, &creply_len, C.int(timeout_ms)) > 0 |
| | | *proc_id = string(C.GoBytes(cpid, cpid_len)) |
| | | pub.Unmarshal(C.GoBytes(creply, creply_len)) |
| | | return r |
| | | } |
| | | |
| | | func AsyncRequest(req *bh.MsgRequestTopic, msg_id *[]byte) bool { |
| | | data, _ := req.Marshal() |
| | | creply := unsafe.Pointer(nil) |
| | | creply_len := C.int(0) |
| | | defer C.BHFree(creply, creply_len) |
| | | r := C.BHAsyncRequest(unsafe.Pointer(&data[0]), C.int(len(data)), &creply, &creply_len) > 0 |
| | | if r { |
| | | *msg_id = C.GoBytes(creply, creply_len) |
| | | } |
| | | return r |
| | | } |
| | | |
| | | func Request(req *bh.MsgRequestTopic, proc_id *string, reply *bh.MsgRequestTopicReply, timeout_ms int) bool { |
| | | data, _ := req.Marshal() |
| | | cpid := unsafe.Pointer(nil) |
| | | cpid_len := C.int(0) |
| | | defer C.BHFree(cpid, cpid_len) |
| | | creply := unsafe.Pointer(nil) |
| | | creply_len := C.int(0) |
| | | defer C.BHFree(creply, creply_len) |
| | | r := C.BHRequest(unsafe.Pointer(&data[0]), C.int(len(data)), &cpid, &cpid_len, &creply, &creply_len, C.int(timeout_ms)) > 0 |
| | | if r { |
| | | *proc_id = string(C.GoBytes(cpid, cpid_len)) |
| | | reply.Unmarshal(C.GoBytes(creply, creply_len)) |
| | | } |
| | | return r |
| | | } |
| | | |
| | | func ReadRequest(proc_id *string, req *bh.MsgRequestTopic, psrc *unsafe.Pointer, timeout_ms int) bool { |
| | | cpid := unsafe.Pointer(nil) |
| | | cpid_len := C.int(0) |
| | | defer C.BHFree(cpid, cpid_len) |
| | | creply := unsafe.Pointer(nil) |
| | | creply_len := C.int(0) |
| | | defer C.BHFree(creply, creply_len) |
| | | r := C.BHReadRequest(&cpid, &cpid_len, &creply, &creply_len, psrc, C.int(timeout_ms)) > 0 |
| | | if r { |
| | | *proc_id = string(C.GoBytes(cpid, cpid_len)) |
| | | req.Unmarshal(C.GoBytes(creply, creply_len)) |
| | | } |
| | | return r |
| | | } |
| | | |
| | | func SendReply(src unsafe.Pointer, rep *bh.MsgRequestTopicReply) bool { |
| | | data, _ := rep.Marshal() |
| | | return C.BHSendReply(src, unsafe.Pointer(&data[0]), C.int(len(data))) > 0 |
| | | } |
| | | |
| | | func GetLastError() (int, string) { |
| | | creply := unsafe.Pointer(nil) |
| | | creply_len := C.int(0) |
| | | defer C.BHFree(creply, creply_len) |
| | | r := C.BHGetLastError(&creply, &creply_len) |
| | | return int(r), string(C.GoBytes(creply, creply_len)) |
| | | |
| | | } |
| | | |
| | | func ServerCallbackReply(tag unsafe.Pointer, rep *bh.MsgRequestTopicReply) bool { |
| | | data, _ := rep.Marshal() |
| | | return C.BHServerCallbackReply(tag, unsafe.Pointer(&data[0]), C.int(len(data))) > 0 |
| | | } |
| | | |
| | | type ServecCB func(proc_id *string, req *bh.MsgRequestTopic, reply *bh.MsgRequestTopicReply) bool |
| | | type SubDataCB func(proc_id *string, pub *bh.MsgPublish) |
| | | type ClientCB func(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply) |
| | | |
| | | func cserver_callback(cpid *unsafe.Pointer, cpid_len unsafe.Pointer) { |
| | | |
| | | } |
| | | func StartWorker(server_cb ServecCB, sub_cb SubDataCB, client_cb ClientCB) { |
| | | |
| | | } |
| | | |
| | | ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
| | | // user code: |
| | | |
| | | func ServerCallback(proc_id *string, req *bh.MsgRequestTopic, reply *bh.MsgRequestTopicReply) bool { |
| | | // xxxx |
| | | return true |
| | | } |
| | | |
| | | func SubDataCallback(proc_id *string, pub *bh.MsgPublish) { |
| | | |
| | | } |
| | | func ClientCallback(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply) { |
| | | |
| | | } |
| | | |
| | | func main() { |
| | | proc := bh.ProcInfo{} |
| | | proc.ProcId = []byte("test_proc") |
| | | reply := bh.MsgCommonReply{} |
| | | |
| | | r := Register(&proc, &reply, 1000) |
| | | if r { |
| | | fmt.Println("register ok") |
| | | } else { |
| | | fmt.Println("register failed") |
| | | return |
| | | } |
| | | |
| | | r = HeartbeatEasy(1000) |
| | | if r { |
| | | fmt.Println("heartbeat ok") |
| | | } else { |
| | | fmt.Println("heartbeat failed") |
| | | } |
| | | |
| | | topics := bh.MsgTopicList{} |
| | | topics.TopicList = append(topics.TopicList, []byte("topic0"), []byte("topic1")) |
| | | RegisterTopics(&topics, &reply, 0) |
| | | if r { |
| | | fmt.Println("reg topics ok") |
| | | } else { |
| | | fmt.Println("reg topics failed") |
| | | } |
| | | |
| | | p := unsafe.Pointer(nil) |
| | | n := C.int32_t(0) |
| | | C.ReadData(&p, &n) |
| | | C.PrintData(p, n) |
| | | } |
New file |
| | |
| | | module bhome |
| | | |
| | | go 1.16 |
| | | |
| | | require github.com/gogo/protobuf v1.3.2 // indirect |
New file |
| | |
| | | github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= |
| | | github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= |
| | | github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= |
| | | github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= |
| | | github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= |
| | | github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= |
| | | golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= |
| | | golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= |
| | | golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= |
| | | golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= |
| | | golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= |
| | | golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= |
| | | golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= |
| | | golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= |
| | | golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= |
| | | golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= |
| | | golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= |
| | | golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= |
| | | golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= |
| | | golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
| | | golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
| | | golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= |
| | | golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= |
| | | golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= |
| | | golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= |
| | | golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= |
| | | golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= |
| | | golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= |
| | | golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= |
| | | golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= |
| | | golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= |
| | |
| | | |
| | | using namespace bhome_shm; |
| | | using namespace bhome_msg; |
| | | using namespace bhome::msg; |
| | | typedef BHCenter::MsgHandler Handler; |
| | | |
| | | namespace |
| | |
| | | public: |
| | | typedef std::string ProcId; |
| | | typedef std::string Address; |
| | | typedef bhome::msg::ProcInfo ProcInfo; |
| | | typedef bhome_msg::ProcInfo ProcInfo; |
| | | typedef std::function<void(Address const &)> Cleaner; |
| | | |
| | | private: |
| | |
| | | |
| | | Handler Combine(const Handler &h1, const Handler &h2) |
| | | { |
| | | return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head) { |
| | | return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head) { |
| | | return h1(socket, msg, head) || h2(socket, msg, head); |
| | | }; |
| | | } |
| | |
| | | #undef CASE_ON_MSG_TYPE |
| | | |
| | | } // namespace |
| | | |
| | | SharedMemory &BHomeShm() |
| | | { |
| | | static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512); |
| | | return shm; |
| | | } |
| | | |
| | | BHCenter::CenterRecords &BHCenter::Centers() |
| | | { |
| | |
| | | auto showStatus = [&]() { |
| | | auto next = Now(); |
| | | const uint64_t start = ToMs(next); |
| | | auto last = 0; |
| | | auto last = 0ul; |
| | | while (run) { |
| | | std::this_thread::sleep_until(next); |
| | | auto passed = ToMs(next) - start; |
| | |
| | | |
| | | char buf[200] = "\n"; |
| | | auto Print = [&](bool new_line) { |
| | | int n = sprintf(buf, "\r%6ds avail : %12ld = %s %6ds", sec, cur, Pretty(cur).c_str() + 1, sec); |
| | | int n = sprintf(buf, "\r%6lds avail : %12ld = %s %6lds", sec, cur, Pretty(cur).c_str() + 1, sec); |
| | | printf("%s", buf); |
| | | if (new_line) { |
| | | auto diff = cur - last; |
| | |
| | | syntax = "proto3"; |
| | | option optimize_for = LITE_RUNTIME; |
| | | option go_package="./bhome_msg"; |
| | | |
| | | // import "google/protobuf/descriptor.proto"; |
| | | import "bhome_msg_api.proto"; |
| | | import "error_msg.proto"; |
| | | |
| | | package bhome.msg; |
| | | package bhome_msg; |
| | | |
| | | |
| | | // message format : head_len(4) + head(BHMsgHead) + body_len(4) + body(variable types) |
| | |
| | | bytes topic = 6; // for request route |
| | | } |
| | | |
| | | message MsgRequest { |
| | | MsgType type = 1; |
| | | // oneof body; |
| | | } |
| | | |
| | | message MsgReply { |
| | | ErrorMsg err_msg = 1; |
| | | // oneof reply |
| | | } |
| | | |
| | | message BHMsgBody { |
| | | oneof reqrep { |
| | | MsgRequest request = 1; |
| | | MsgReply reply = 2; |
| | | } |
| | | } |
| | | |
| | | enum MsgType { |
| | | kMsgTypeInvalid = 0; |
| | |
| | | rpc Query (MsgQueryTopic) returns (MsgQueryTopicReply); |
| | | rpc Request (MsgRequestTopic) returns (MsgQueryTopicReply); |
| | | } |
| | | |
| | | message MsgRequest { |
| | | // oneof body; |
| | | oneof request { |
| | | MsgRegister register = 1; |
| | | MsgRequestTopic topic_request = 2; |
| | | MsgQueryTopic topic_query = 3; |
| | | } |
| | | } |
| | | |
| | | message MsgReply { |
| | | ErrorMsg err_msg = 1; |
| | | // oneof reply |
| | | } |
| | | |
| | | message BHMsgBody { |
| | | oneof reqrep { |
| | | MsgRequest request = 1; |
| | | MsgReply reply = 2; |
| | | } |
| | | } |
| | |
| | | syntax = "proto3"; |
| | | option optimize_for = LITE_RUNTIME; |
| | | option go_package="./bhome_msg"; |
| | | |
| | | // public messages |
| | | import "error_msg.proto"; |
| | | |
| | | package bhome.msg; |
| | | package bhome_msg; |
| | | |
| | | message BHAddress { |
| | | bytes mq_id = 1; // mqid, uuid |
| | |
| | | syntax = "proto3"; |
| | | |
| | | option optimize_for = LITE_RUNTIME; |
| | | option go_package="./bhome_msg"; |
| | | |
| | | package bhome.msg; |
| | | package bhome_msg; |
| | | |
| | | enum ErrorCode { |
| | | eSuccess = 0; |
| | |
| | | return false; |
| | | } |
| | | MsgOut msg_reply; |
| | | if ((ProcNode().*mfunc)(input, msg_reply, timeout_ms)) { |
| | | return PackOutput(msg_reply, reply, reply_len); |
| | | |
| | | } else { |
| | | return false; |
| | | } |
| | | return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) && |
| | | PackOutput(msg_reply, reply, reply_len); |
| | | } |
| | | |
| | | } // namespace |
| | | |
| | | bool BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) |
| | | int BHApiIn1Out1Proxy(FBHApiIn1Out1 func, |
| | | const void *request, |
| | | const int request_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms) |
| | | { |
| | | return (*func)(request, request_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | bool BHHeartBeatEasy(const int timeout_ms) |
| | | int BHHeartbeatEasy(const int timeout_ms) |
| | | { |
| | | return ProcNode().Heartbeat(timeout_ms); |
| | | } |
| | | |
| | | bool BHHeartBeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) |
| | | int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | bool BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | bool BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | bool BHPublish(const void *msgpub, |
| | | const int msgpub_len, |
| | | const int timeout_ms) |
| | | int BHPublish(const void *msgpub, |
| | | const int msgpub_len, |
| | | const int timeout_ms) |
| | | { |
| | | MsgPublish pub; |
| | | if (!pub.ParseFromArray(msgpub, msgpub_len)) { |
| | |
| | | return ProcNode().Publish(pub, timeout_ms); |
| | | } |
| | | |
| | | bool BHReadSub(void **proc_id, |
| | | int *proc_id_len, |
| | | void **msgpub, |
| | | int *msgpub_len, |
| | | const int timeout_ms) |
| | | int BHReadSub(void **proc_id, |
| | | int *proc_id_len, |
| | | void **msgpub, |
| | | int *msgpub_len, |
| | | const int timeout_ms) |
| | | { |
| | | std::string proc; |
| | | MsgPublish pub; |
| | |
| | | return false; |
| | | } |
| | | |
| | | bool BHAsyncRequest(const void *request, |
| | | const int request_len, |
| | | void **msg_id, |
| | | int *msg_id_len) |
| | | int BHAsyncRequest(const void *request, |
| | | const int request_len, |
| | | void **msg_id, |
| | | int *msg_id_len) |
| | | { |
| | | MsgRequestTopic req; |
| | | if (!req.ParseFromArray(request, request_len)) { |
| | |
| | | return false; |
| | | } |
| | | |
| | | bool BHRequest(const void *request, |
| | | const int request_len, |
| | | void **proc_id, |
| | | int *proc_id_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms) |
| | | int BHRequest(const void *request, |
| | | const int request_len, |
| | | void **proc_id, |
| | | int *proc_id_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms) |
| | | { |
| | | MsgRequestTopic req; |
| | | if (!req.ParseFromArray(request, request_len)) { |
| | |
| | | return false; |
| | | } |
| | | |
| | | bool BHReadRequest(void **proc_id, |
| | | int *proc_id_len, |
| | | void **request, |
| | | int *request_len, |
| | | void **src, |
| | | const int timeout_ms) |
| | | int BHReadRequest(void **proc_id, |
| | | int *proc_id_len, |
| | | void **request, |
| | | int *request_len, |
| | | void **src, |
| | | const int timeout_ms) |
| | | { |
| | | void *src_info = 0; |
| | | std::string proc; |
| | |
| | | return false; |
| | | } |
| | | |
| | | bool BHSendReply(void *src, |
| | | const void *reply, |
| | | const int reply_len) |
| | | int BHSendReply(void *src, |
| | | const void *reply, |
| | | const int reply_len) |
| | | { |
| | | MsgRequestTopicReply rep; |
| | | if (!rep.ParseFromArray(reply, reply_len)) { |
| | |
| | | r = reply.ParseFromArray(p, len); |
| | | return r; |
| | | }; |
| | | server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), (BHServerCallbackTag *) (&sender)); |
| | | server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), &sender); |
| | | return r; |
| | | }; |
| | | } |
| | |
| | | |
| | | ProcNode().Start(on_req, on_sub, on_reply); |
| | | } |
| | | bool BHServerCallbackReply(const BHServerCallbackTag *tag, |
| | | const void *data, |
| | | const int data_len) |
| | | int BHServerCallbackReply(const void *tag, |
| | | const void *data, |
| | | const int data_len) |
| | | { |
| | | auto &sender = *(const ServerSender *) (tag); |
| | | return sender(data, data_len); |
| | |
| | | extern "C" { |
| | | #endif |
| | | |
| | | struct BHSrcInfo; |
| | | struct BHServerCallbackTag; |
| | | typedef int (*FBHApiIn1Out1)(const void *proc_info, |
| | | const int proc_info_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | bool BHRegister(const void *proc_info, |
| | | const int proc_info_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | bool BHRegisterTopics(const void *topics, |
| | | const int topics_len, |
| | | int BHApiIn1Out1Proxy(FBHApiIn1Out1 func, |
| | | const void *request, |
| | | const int request_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | bool BHSubscribeTopics(const void *topics, |
| | | const int topics_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | int BHRegister(const void *proc_info, |
| | | const int proc_info_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | int BHRegisterTopics(const void *topics, |
| | | const int topics_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | int BHSubscribeTopics(const void *topics, |
| | | const int topics_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | typedef void (*FSubDataCallback)(const void *proc_id, |
| | | const int proc_id_len, |
| | |
| | | const int proc_id_len, |
| | | const void *data, |
| | | const int data_len, |
| | | BHServerCallbackTag *tag); |
| | | const void *tag); |
| | | |
| | | typedef void (*FClientCallback)(const void *proc_id, |
| | | const int proc_id_len, |
| | |
| | | const int data_len); |
| | | |
| | | void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb); |
| | | bool BHServerCallbackReply(const BHServerCallbackTag *tag, |
| | | const void *data, |
| | | const int data_len); |
| | | |
| | | bool BHHeartBeatEasy(const int timeout_ms); |
| | | bool BHHeartBeat(const void *proc_info, |
| | | const int proc_info_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | int BHServerCallbackReply(const void *tag, |
| | | const void *data, |
| | | const int data_len); |
| | | |
| | | bool BHPublish(const void *msgpub, |
| | | const int msgpub_len, |
| | | const int timeout_ms); |
| | | int BHHeartbeatEasy(const int timeout_ms); |
| | | int BHHeartbeat(const void *proc_info, |
| | | const int proc_info_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | bool BHReadSub(const void *proc_id, |
| | | const int proc_id_len, |
| | | void **msgpub, |
| | | int *msgpub_len, |
| | | const int timeout_ms); |
| | | int BHPublish(const void *msgpub, |
| | | const int msgpub_len, |
| | | const int timeout_ms); |
| | | |
| | | bool BHAsyncRequest(const void *request, |
| | | const int request_len, |
| | | void **msg_id, |
| | | int *msg_id_len); |
| | | int BHReadSub(void **proc_id, |
| | | int *proc_id_len, |
| | | void **msgpub, |
| | | int *msgpub_len, |
| | | const int timeout_ms); |
| | | |
| | | bool BHRequest(const void *request, |
| | | const int request_len, |
| | | void **proc_id, |
| | | int *proc_id_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | int BHAsyncRequest(const void *request, |
| | | const int request_len, |
| | | void **msg_id, |
| | | int *msg_id_len); |
| | | |
| | | bool BHReadRequest(void **proc_id, |
| | | int *proc_id_len, |
| | | void **request, |
| | | int *request_len, |
| | | BHSrcInfo **src, |
| | | const int timeout_ms); |
| | | int BHRequest(const void *request, |
| | | const int request_len, |
| | | void **proc_id, |
| | | int *proc_id_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | bool BHSendReply(BHSrcInfo *src, |
| | | const void *reply, |
| | | const int reply_len); |
| | | int BHReadRequest(void **proc_id, |
| | | int *proc_id_len, |
| | | void **request, |
| | | int *request_len, |
| | | void **src, |
| | | const int timeout_ms); |
| | | |
| | | int BHSendReply(void *src, |
| | | const void *reply, |
| | | const int reply_len); |
| | | |
| | | // int BHCleanUp(); |
| | | |
| | | void BHFree(void *buf, int size); |
| | | |
| | | int BHGetLastError(void **msg, int &msg_len); |
| | | int BHGetLastError(void **msg, int *msg_len); |
| | | |
| | | #ifdef __cplusplus |
| | | } |
| | |
| | | * ===================================================================================== |
| | | */ |
| | | #include "defs.h" |
| | | #include "shm.h" |
| | | |
| | | namespace |
| | | { |
| | | |
| | |
| | | const MQId &BHTopicCenterAddress() { return kBHTopicCenter; } |
| | | const MQId &BHUniCenterAddress() { return kBHUniCenter; } |
| | | |
| | | bhome_shm::SharedMemory &BHomeShm() |
| | | { |
| | | static bhome_shm::SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512); |
| | | return shm; |
| | | } |
| | | |
| | | void SetLastError(const int ec, const std::string &msg) |
| | | { |
| | | LastErrorStore().ec_ = ec; |
| | |
| | | namespace bhome_msg |
| | | { |
| | | using namespace bhome_shm; |
| | | using namespace bhome::msg; // for serialized data in MsgI |
| | | |
| | | // MsgI is safe to be stored in shared memory, so POD data or offset_ptr is required. |
| | | // message format: header(meta) + body(data). |
| | |
| | | #include "bhome_msg_api.pb.h" |
| | | #include <chrono> |
| | | |
| | | using namespace bhome::msg; |
| | | using namespace bhome_msg; |
| | | |
| | | template <class Msg> |
| | | struct MsgToType { |
| | | }; |
| | | |
| | | #define BHOME_MAP_MSG_AND_TYPE(mSG, tYPE) \ |
| | | template <> \ |
| | | struct MsgToType<mSG> { \ |
| | | static const bhome::msg::MsgType value = tYPE; \ |
| | | #define BHOME_MAP_MSG_AND_TYPE(mSG, tYPE) \ |
| | | template <> \ |
| | | struct MsgToType<mSG> { \ |
| | | static const MsgType value = tYPE; \ |
| | | }; |
| | | |
| | | #define BHOME_SIMPLE_MAP_MSG(name) BHOME_MAP_MSG_AND_TYPE(Msg##name, kMsgType##name) |
| | |
| | | #undef BHOME_MAP_MSG_AND_TYPE |
| | | |
| | | template <class Msg> |
| | | constexpr inline bhome::msg::MsgType GetType(const Msg &) |
| | | constexpr inline MsgType GetType(const Msg &) |
| | | { |
| | | return MsgToType<Msg>::value; |
| | | } |
| | |
| | | } |
| | | |
| | | //maybe reimplment, using async cbs? |
| | | bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms) |
| | | bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms) |
| | | { |
| | | // std::lock_guard<std::mutex> lock(mutex_); // seems no need to lock mutex_. |
| | | bool got = (timeout_ms == 0) ? mq().TryRecv(msg) : mq().Recv(msg, timeout_ms); |
| | |
| | | size_t Pending() const { return mq().Pending(); } |
| | | |
| | | template <class Body> |
| | | bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb = RecvCB()) |
| | | bool Send(const void *valid_remote, BHMsgHead &head, Body &body, const RecvCB &cb = RecvCB()) |
| | | { |
| | | try { |
| | | if (!cb) { |
| | |
| | | return SendImpl(valid_remote, imsg); |
| | | } |
| | | |
| | | bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms); |
| | | bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms); |
| | | |
| | | template <class Body> |
| | | bool SendAndRecv(const void *remote, const BHMsgHead &head, const Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) |
| | | bool SendAndRecv(const void *remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) |
| | | { |
| | | struct State { |
| | | std::mutex mutex; |
| | |
| | | return false; |
| | | } |
| | | |
| | | bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms) |
| | | bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | |
| | | void Stop(); |
| | | |
| | | private: |
| | | bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms); |
| | | bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms); |
| | | const std::string &proc_id() { return info_.proc_id(); } |
| | | |
| | | typedef bhome_msg::BHAddress Address; |
| | | typedef BHAddress Address; |
| | | class TopicQueryCache |
| | | { |
| | | class Impl |
| | |
| | | #include "util.h" |
| | | #include <atomic> |
| | | |
| | | using namespace bhome::msg; |
| | | using namespace bhome_msg; |
| | | |
| | | namespace |
| | | { |
| | |
| | | const int proc_id_len, |
| | | const void *data, |
| | | const int data_len, |
| | | BHServerCallbackTag *tag) |
| | | const void *tag) |
| | | { |
| | | // printf("ServerProc: "); |
| | | // DEFER1(printf("\n");); |
| | |
| | | } |
| | | } |
| | | |
| | | namespace |
| | | { |
| | | struct CCC { |
| | | }; |
| | | void F(CCC &&c) {} |
| | | |
| | | template <class... T> |
| | | void Pass(T &&...t) |
| | | { |
| | | F(std::forward<decltype(t)>(t)...); |
| | | } |
| | | |
| | | } // namespace |
| | | BOOST_AUTO_TEST_CASE(ApiTest) |
| | | { |
| | | auto max_time = std::chrono::steady_clock::time_point::max(); |
| | |
| | | auto hb = [](std::atomic<bool> *run) { |
| | | while (*run) { |
| | | Sleep(1s, false); |
| | | bool r = BHHeartBeatEasy(1000); |
| | | bool r = BHHeartbeatEasy(1000); |
| | | printf("heartbeat: %s\n", r ? "ok" : "failed"); |
| | | } |
| | | }; |
| | |
| | | { |
| | | const std::string shm_name("ShmReqRep"); |
| | | ShmRemover auto_remove(shm_name); |
| | | SharedMemory shm(shm_name, 1024 * 1024 * 50); |
| | | SharedMemory shm(shm_name, 1024 * 1024 * 512); |
| | | |
| | | auto Avail = [&]() { return shm.get_free_memory(); }; |
| | | auto init_avail = Avail(); |
| | |
| | | printf("count: %d\n", count.load()); |
| | | } |
| | | }; |
| | | MsgRequestTopic req; |
| | | req.set_topic(topic); |
| | | req.set_data("data " + std::string(100, 'a')); |
| | | client.ClientStartWorker(onRecv, 2); |
| | | boost::timer::auto_cpu_timer timer; |
| | | for (int i = 0; i < nreq; ++i) { |
| | | MsgRequestTopic req; |
| | | req.set_topic(topic); |
| | | req.set_data("data " + std::to_string(i)); |
| | | std::string msg_id; |
| | | if (!client.ClientAsyncRequest(req, msg_id)) { |
| | | printf("client request failed\n"); |
| | | ++count; |
| | | } |
| | | |
| | | // if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) { |
| | | // std::string proc_id; |
| | | // MsgRequestTopicReply reply; |
| | | // if (!client.ClientSyncRequest(req, proc_id, reply, 1000)) { |
| | | // printf("client request failed\n"); |
| | | // } |
| | | // ++count; |
| | | // ++count; |
| | | } |
| | | do { |
| | | std::this_thread::yield(); |
| | |
| | | servers.Launch(Server, "server", topics); |
| | | Sleep(100ms); |
| | | for (auto &t : topics) { |
| | | clients.Launch(Client, t, 1000 * 100); |
| | | clients.Launch(Client, t, 1000 * 100 * 2); |
| | | } |
| | | clients.WaitAll(); |
| | | printf("clients done, server replyed: %ld\n", server_msg_count.load()); |