From ca319178f45ce6256aed7913565d445571f6db22 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 20 四月 2021 11:04:07 +0800
Subject: [PATCH] add go api, wrap C api, not finished.
---
api/go/bhome_node.go | 214 +++++++++++++++++++++
api/go/go.mod | 5
src/socket.h | 6
.gitignore | 3
box/center.cpp | 11
src/proto.h | 12
src/msg.h | 1
src/socket.cpp | 2
proto/source/bhome_msg.proto | 40 ++-
utest/utest.cpp | 16
proto/source/error_msg.proto | 3
src/topic_node.cpp | 2
src/defs.cpp | 8
proto/source/bhome_msg_api.proto | 3
utest/api_test.cpp | 19 -
box/status_main.cc | 4
api/go/bh_api.h | 1
src/topic_node.h | 4
src/bh_api.h | 119 ++++++-----
api/go/go.sum | 31 +++
src/bh_api.cpp | 92 ++++----
21 files changed, 431 insertions(+), 165 deletions(-)
diff --git a/.gitignore b/.gitignore
index 6d80560..4e2c214 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,4 +9,5 @@
*/bhshmq_center
*/help
*/bhshmqbox
-*/bhshmq_status
\ No newline at end of file
+*/bhshmq_status
+api/go/bhome_msg/*.pb.go
diff --git a/api/go/bh_api.h b/api/go/bh_api.h
new file mode 120000
index 0000000..627e3b4
--- /dev/null
+++ b/api/go/bh_api.h
@@ -0,0 +1 @@
+../../src/bh_api.h
\ No newline at end of file
diff --git a/api/go/bhome_node.go b/api/go/bhome_node.go
new file mode 100644
index 0000000..c950750
--- /dev/null
+++ b/api/go/bhome_node.go
@@ -0,0 +1,214 @@
+package main
+
+/*
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdint.h>
+#include "bh_api.h"
+
+void print(int v)
+{
+ printf("print %d\n", v);
+}
+
+static void ReadData(void **p, int32_t *n)
+{
+ *n = 4;
+ *p = malloc(4);
+ memcpy(*p, "abc", 4);
+ *n = 4;
+}
+static void PrintData(void *p, int32_t n)
+{
+ printf("data :%s\n", (char*)p);
+ free(p);
+}
+
+*/
+// #cgo LDFLAGS: -L/home/lichao/code/shmsg/build/lib -L/usr/local/lib -lbhome_shmq -lbhome_msg -lprotobuf-lite -lstdc++ -lpthread -lrt
+import "C"
+
+import (
+ bh "bhome/bhome_msg"
+ "fmt"
+ "unsafe"
+)
+
+func BHApiIn1Out1(bhfunc C.FBHApiIn1Out1, data []byte, reply *bh.MsgCommonReply, timeout_ms int) bool {
+ creply := unsafe.Pointer(nil)
+ creply_len := C.int(0)
+ defer C.BHFree(creply, creply_len)
+ r := C.BHApiIn1Out1Proxy(bhfunc, unsafe.Pointer(&data[0]), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0
+ if r {
+ reply.Unmarshal(C.GoBytes(creply, creply_len))
+ }
+ return r
+
+}
+
+func Register(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool {
+ data, _ := proc.Marshal()
+ return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHRegister), data, reply, timeout_ms)
+}
+
+func RegisterTopics(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool {
+ data, _ := topics.Marshal()
+ return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHRegisterTopics), data, reply, timeout_ms)
+}
+
+func Subscribe(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool {
+ data, _ := topics.Marshal()
+ return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeTopics), data, reply, timeout_ms)
+}
+
+func Heartbeat(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool {
+ data, _ := topics.Marshal()
+ return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHHeartbeat), data, reply, timeout_ms)
+}
+
+func HeartbeatEasy(timeout_ms int) bool {
+ return C.BHHeartbeatEasy(C.int(timeout_ms)) > 0
+}
+
+func Publish(pub *bh.MsgPublish, timeout_ms int) bool {
+ data, _ := pub.Marshal()
+ return C.BHPublish(unsafe.Pointer(&data[0]), C.int(len(data)), C.int(timeout_ms)) > 0
+}
+
+func ReadSub(proc_id *string, pub *bh.MsgPublish, timeout_ms int) bool {
+ cpid := unsafe.Pointer(nil)
+ cpid_len := C.int(0)
+ defer C.BHFree(cpid, cpid_len)
+ creply := unsafe.Pointer(nil)
+ creply_len := C.int(0)
+ defer C.BHFree(creply, creply_len)
+
+ r := C.BHReadSub(&cpid, &cpid_len, &creply, &creply_len, C.int(timeout_ms)) > 0
+ *proc_id = string(C.GoBytes(cpid, cpid_len))
+ pub.Unmarshal(C.GoBytes(creply, creply_len))
+ return r
+}
+
+func AsyncRequest(req *bh.MsgRequestTopic, msg_id *[]byte) bool {
+ data, _ := req.Marshal()
+ creply := unsafe.Pointer(nil)
+ creply_len := C.int(0)
+ defer C.BHFree(creply, creply_len)
+ r := C.BHAsyncRequest(unsafe.Pointer(&data[0]), C.int(len(data)), &creply, &creply_len) > 0
+ if r {
+ *msg_id = C.GoBytes(creply, creply_len)
+ }
+ return r
+}
+
+func Request(req *bh.MsgRequestTopic, proc_id *string, reply *bh.MsgRequestTopicReply, timeout_ms int) bool {
+ data, _ := req.Marshal()
+ cpid := unsafe.Pointer(nil)
+ cpid_len := C.int(0)
+ defer C.BHFree(cpid, cpid_len)
+ creply := unsafe.Pointer(nil)
+ creply_len := C.int(0)
+ defer C.BHFree(creply, creply_len)
+ r := C.BHRequest(unsafe.Pointer(&data[0]), C.int(len(data)), &cpid, &cpid_len, &creply, &creply_len, C.int(timeout_ms)) > 0
+ if r {
+ *proc_id = string(C.GoBytes(cpid, cpid_len))
+ reply.Unmarshal(C.GoBytes(creply, creply_len))
+ }
+ return r
+}
+
+func ReadRequest(proc_id *string, req *bh.MsgRequestTopic, psrc *unsafe.Pointer, timeout_ms int) bool {
+ cpid := unsafe.Pointer(nil)
+ cpid_len := C.int(0)
+ defer C.BHFree(cpid, cpid_len)
+ creply := unsafe.Pointer(nil)
+ creply_len := C.int(0)
+ defer C.BHFree(creply, creply_len)
+ r := C.BHReadRequest(&cpid, &cpid_len, &creply, &creply_len, psrc, C.int(timeout_ms)) > 0
+ if r {
+ *proc_id = string(C.GoBytes(cpid, cpid_len))
+ req.Unmarshal(C.GoBytes(creply, creply_len))
+ }
+ return r
+}
+
+func SendReply(src unsafe.Pointer, rep *bh.MsgRequestTopicReply) bool {
+ data, _ := rep.Marshal()
+ return C.BHSendReply(src, unsafe.Pointer(&data[0]), C.int(len(data))) > 0
+}
+
+func GetLastError() (int, string) {
+ creply := unsafe.Pointer(nil)
+ creply_len := C.int(0)
+ defer C.BHFree(creply, creply_len)
+ r := C.BHGetLastError(&creply, &creply_len)
+ return int(r), string(C.GoBytes(creply, creply_len))
+
+}
+
+func ServerCallbackReply(tag unsafe.Pointer, rep *bh.MsgRequestTopicReply) bool {
+ data, _ := rep.Marshal()
+ return C.BHServerCallbackReply(tag, unsafe.Pointer(&data[0]), C.int(len(data))) > 0
+}
+
+type ServecCB func(proc_id *string, req *bh.MsgRequestTopic, reply *bh.MsgRequestTopicReply) bool
+type SubDataCB func(proc_id *string, pub *bh.MsgPublish)
+type ClientCB func(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply)
+
+func cserver_callback(cpid *unsafe.Pointer, cpid_len unsafe.Pointer) {
+
+}
+func StartWorker(server_cb ServecCB, sub_cb SubDataCB, client_cb ClientCB) {
+
+}
+
+/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// user code:
+
+func ServerCallback(proc_id *string, req *bh.MsgRequestTopic, reply *bh.MsgRequestTopicReply) bool {
+ // xxxx
+ return true
+}
+
+func SubDataCallback(proc_id *string, pub *bh.MsgPublish) {
+
+}
+func ClientCallback(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply) {
+
+}
+
+func main() {
+ proc := bh.ProcInfo{}
+ proc.ProcId = []byte("test_proc")
+ reply := bh.MsgCommonReply{}
+
+ r := Register(&proc, &reply, 1000)
+ if r {
+ fmt.Println("register ok")
+ } else {
+ fmt.Println("register failed")
+ return
+ }
+
+ r = HeartbeatEasy(1000)
+ if r {
+ fmt.Println("heartbeat ok")
+ } else {
+ fmt.Println("heartbeat failed")
+ }
+
+ topics := bh.MsgTopicList{}
+ topics.TopicList = append(topics.TopicList, []byte("topic0"), []byte("topic1"))
+ RegisterTopics(&topics, &reply, 0)
+ if r {
+ fmt.Println("reg topics ok")
+ } else {
+ fmt.Println("reg topics failed")
+ }
+
+ p := unsafe.Pointer(nil)
+ n := C.int32_t(0)
+ C.ReadData(&p, &n)
+ C.PrintData(p, n)
+}
diff --git a/api/go/go.mod b/api/go/go.mod
new file mode 100644
index 0000000..7a12c30
--- /dev/null
+++ b/api/go/go.mod
@@ -0,0 +1,5 @@
+module bhome
+
+go 1.16
+
+require github.com/gogo/protobuf v1.3.2 // indirect
diff --git a/api/go/go.sum b/api/go/go.sum
new file mode 100644
index 0000000..faf43b7
--- /dev/null
+++ b/api/go/go.sum
@@ -0,0 +1,31 @@
+github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
+github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
+github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
+github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
+golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
diff --git a/box/center.cpp b/box/center.cpp
index e8d014e..3059e90 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -27,7 +27,6 @@
using namespace bhome_shm;
using namespace bhome_msg;
-using namespace bhome::msg;
typedef BHCenter::MsgHandler Handler;
namespace
@@ -39,7 +38,7 @@
public:
typedef std::string ProcId;
typedef std::string Address;
- typedef bhome::msg::ProcInfo ProcInfo;
+ typedef bhome_msg::ProcInfo ProcInfo;
typedef std::function<void(Address const &)> Cleaner;
private:
@@ -396,7 +395,7 @@
Handler Combine(const Handler &h1, const Handler &h2)
{
- return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head) {
+ return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head) {
return h1(socket, msg, head) || h2(socket, msg, head);
};
}
@@ -490,12 +489,6 @@
#undef CASE_ON_MSG_TYPE
} // namespace
-
-SharedMemory &BHomeShm()
-{
- static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512);
- return shm;
-}
BHCenter::CenterRecords &BHCenter::Centers()
{
diff --git a/box/status_main.cc b/box/status_main.cc
index 993fbee..3f075fb 100644
--- a/box/status_main.cc
+++ b/box/status_main.cc
@@ -41,7 +41,7 @@
auto showStatus = [&]() {
auto next = Now();
const uint64_t start = ToMs(next);
- auto last = 0;
+ auto last = 0ul;
while (run) {
std::this_thread::sleep_until(next);
auto passed = ToMs(next) - start;
@@ -70,7 +70,7 @@
char buf[200] = "\n";
auto Print = [&](bool new_line) {
- int n = sprintf(buf, "\r%6ds avail : %12ld = %s %6ds", sec, cur, Pretty(cur).c_str() + 1, sec);
+ int n = sprintf(buf, "\r%6lds avail : %12ld = %s %6lds", sec, cur, Pretty(cur).c_str() + 1, sec);
printf("%s", buf);
if (new_line) {
auto diff = cur - last;
diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index 11ff5a2..aabe372 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -1,11 +1,12 @@
syntax = "proto3";
option optimize_for = LITE_RUNTIME;
+option go_package="./bhome_msg";
// import "google/protobuf/descriptor.proto";
import "bhome_msg_api.proto";
import "error_msg.proto";
-package bhome.msg;
+package bhome_msg;
// message format : head_len(4) + head(BHMsgHead) + body_len(4) + body(variable types)
@@ -19,22 +20,6 @@
bytes topic = 6; // for request route
}
-message MsgRequest {
- MsgType type = 1;
- // oneof body;
-}
-
-message MsgReply {
- ErrorMsg err_msg = 1;
- // oneof reply
-}
-
-message BHMsgBody {
- oneof reqrep {
- MsgRequest request = 1;
- MsgReply reply = 2;
- }
-}
enum MsgType {
kMsgTypeInvalid = 0;
@@ -76,3 +61,24 @@
rpc Query (MsgQueryTopic) returns (MsgQueryTopicReply);
rpc Request (MsgRequestTopic) returns (MsgQueryTopicReply);
}
+
+message MsgRequest {
+ // oneof body;
+ oneof request {
+ MsgRegister register = 1;
+ MsgRequestTopic topic_request = 2;
+ MsgQueryTopic topic_query = 3;
+ }
+}
+
+message MsgReply {
+ ErrorMsg err_msg = 1;
+ // oneof reply
+}
+
+message BHMsgBody {
+ oneof reqrep {
+ MsgRequest request = 1;
+ MsgReply reply = 2;
+ }
+}
diff --git a/proto/source/bhome_msg_api.proto b/proto/source/bhome_msg_api.proto
index a8e8545..1c7cc1c 100644
--- a/proto/source/bhome_msg_api.proto
+++ b/proto/source/bhome_msg_api.proto
@@ -1,10 +1,11 @@
syntax = "proto3";
option optimize_for = LITE_RUNTIME;
+option go_package="./bhome_msg";
// public messages
import "error_msg.proto";
-package bhome.msg;
+package bhome_msg;
message BHAddress {
bytes mq_id = 1; // mqid, uuid
diff --git a/proto/source/error_msg.proto b/proto/source/error_msg.proto
index b85ddb3..6496c67 100644
--- a/proto/source/error_msg.proto
+++ b/proto/source/error_msg.proto
@@ -1,8 +1,9 @@
syntax = "proto3";
option optimize_for = LITE_RUNTIME;
+option go_package="./bhome_msg";
-package bhome.msg;
+package bhome_msg;
enum ErrorCode {
eSuccess = 0;
diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index 2abe66d..3844000 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -83,44 +83,50 @@
return false;
}
MsgOut msg_reply;
- if ((ProcNode().*mfunc)(input, msg_reply, timeout_ms)) {
- return PackOutput(msg_reply, reply, reply_len);
-
- } else {
- return false;
- }
+ return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) &&
+ PackOutput(msg_reply, reply, reply_len);
}
} // namespace
-bool BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
+int BHApiIn1Out1Proxy(FBHApiIn1Out1 func,
+ const void *request,
+ const int request_len,
+ void **reply,
+ int *reply_len,
+ const int timeout_ms)
+{
+ return (*func)(request, request_len, reply, reply_len, timeout_ms);
+}
+
+int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
-bool BHHeartBeatEasy(const int timeout_ms)
+int BHHeartbeatEasy(const int timeout_ms)
{
return ProcNode().Heartbeat(timeout_ms);
}
-bool BHHeartBeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
+int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
-bool BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
}
-bool BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
}
-bool BHPublish(const void *msgpub,
- const int msgpub_len,
- const int timeout_ms)
+int BHPublish(const void *msgpub,
+ const int msgpub_len,
+ const int timeout_ms)
{
MsgPublish pub;
if (!pub.ParseFromArray(msgpub, msgpub_len)) {
@@ -130,11 +136,11 @@
return ProcNode().Publish(pub, timeout_ms);
}
-bool BHReadSub(void **proc_id,
- int *proc_id_len,
- void **msgpub,
- int *msgpub_len,
- const int timeout_ms)
+int BHReadSub(void **proc_id,
+ int *proc_id_len,
+ void **msgpub,
+ int *msgpub_len,
+ const int timeout_ms)
{
std::string proc;
MsgPublish pub;
@@ -151,10 +157,10 @@
return false;
}
-bool BHAsyncRequest(const void *request,
- const int request_len,
- void **msg_id,
- int *msg_id_len)
+int BHAsyncRequest(const void *request,
+ const int request_len,
+ void **msg_id,
+ int *msg_id_len)
{
MsgRequestTopic req;
if (!req.ParseFromArray(request, request_len)) {
@@ -178,13 +184,13 @@
return false;
}
-bool BHRequest(const void *request,
- const int request_len,
- void **proc_id,
- int *proc_id_len,
- void **reply,
- int *reply_len,
- const int timeout_ms)
+int BHRequest(const void *request,
+ const int request_len,
+ void **proc_id,
+ int *proc_id_len,
+ void **reply,
+ int *reply_len,
+ const int timeout_ms)
{
MsgRequestTopic req;
if (!req.ParseFromArray(request, request_len)) {
@@ -205,12 +211,12 @@
return false;
}
-bool BHReadRequest(void **proc_id,
- int *proc_id_len,
- void **request,
- int *request_len,
- void **src,
- const int timeout_ms)
+int BHReadRequest(void **proc_id,
+ int *proc_id_len,
+ void **request,
+ int *request_len,
+ void **src,
+ const int timeout_ms)
{
void *src_info = 0;
std::string proc;
@@ -228,9 +234,9 @@
return false;
}
-bool BHSendReply(void *src,
- const void *reply,
- const int reply_len)
+int BHSendReply(void *src,
+ const void *reply,
+ const int reply_len)
{
MsgRequestTopicReply rep;
if (!rep.ParseFromArray(reply, reply_len)) {
@@ -263,7 +269,7 @@
r = reply.ParseFromArray(p, len);
return r;
};
- server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), (BHServerCallbackTag *) (&sender));
+ server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), &sender);
return r;
};
}
@@ -284,9 +290,9 @@
ProcNode().Start(on_req, on_sub, on_reply);
}
-bool BHServerCallbackReply(const BHServerCallbackTag *tag,
- const void *data,
- const int data_len)
+int BHServerCallbackReply(const void *tag,
+ const void *data,
+ const int data_len)
{
auto &sender = *(const ServerSender *) (tag);
return sender(data, data_len);
diff --git a/src/bh_api.h b/src/bh_api.h
index eeb47a5..39b4cc6 100644
--- a/src/bh_api.h
+++ b/src/bh_api.h
@@ -5,26 +5,36 @@
extern "C" {
#endif
-struct BHSrcInfo;
-struct BHServerCallbackTag;
+typedef int (*FBHApiIn1Out1)(const void *proc_info,
+ const int proc_info_len,
+ void **reply,
+ int *reply_len,
+ const int timeout_ms);
-bool BHRegister(const void *proc_info,
- const int proc_info_len,
- void **reply,
- int *reply_len,
- const int timeout_ms);
-
-bool BHRegisterTopics(const void *topics,
- const int topics_len,
+int BHApiIn1Out1Proxy(FBHApiIn1Out1 func,
+ const void *request,
+ const int request_len,
void **reply,
int *reply_len,
const int timeout_ms);
-bool BHSubscribeTopics(const void *topics,
- const int topics_len,
- void **reply,
- int *reply_len,
- const int timeout_ms);
+int BHRegister(const void *proc_info,
+ const int proc_info_len,
+ void **reply,
+ int *reply_len,
+ const int timeout_ms);
+
+int BHRegisterTopics(const void *topics,
+ const int topics_len,
+ void **reply,
+ int *reply_len,
+ const int timeout_ms);
+
+int BHSubscribeTopics(const void *topics,
+ const int topics_len,
+ void **reply,
+ int *reply_len,
+ const int timeout_ms);
typedef void (*FSubDataCallback)(const void *proc_id,
const int proc_id_len,
@@ -35,7 +45,7 @@
const int proc_id_len,
const void *data,
const int data_len,
- BHServerCallbackTag *tag);
+ const void *tag);
typedef void (*FClientCallback)(const void *proc_id,
const int proc_id_len,
@@ -45,56 +55,57 @@
const int data_len);
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb);
-bool BHServerCallbackReply(const BHServerCallbackTag *tag,
- const void *data,
- const int data_len);
-bool BHHeartBeatEasy(const int timeout_ms);
-bool BHHeartBeat(const void *proc_info,
- const int proc_info_len,
- void **reply,
- int *reply_len,
- const int timeout_ms);
+int BHServerCallbackReply(const void *tag,
+ const void *data,
+ const int data_len);
-bool BHPublish(const void *msgpub,
- const int msgpub_len,
- const int timeout_ms);
+int BHHeartbeatEasy(const int timeout_ms);
+int BHHeartbeat(const void *proc_info,
+ const int proc_info_len,
+ void **reply,
+ int *reply_len,
+ const int timeout_ms);
-bool BHReadSub(const void *proc_id,
- const int proc_id_len,
- void **msgpub,
- int *msgpub_len,
- const int timeout_ms);
+int BHPublish(const void *msgpub,
+ const int msgpub_len,
+ const int timeout_ms);
-bool BHAsyncRequest(const void *request,
- const int request_len,
- void **msg_id,
- int *msg_id_len);
+int BHReadSub(void **proc_id,
+ int *proc_id_len,
+ void **msgpub,
+ int *msgpub_len,
+ const int timeout_ms);
-bool BHRequest(const void *request,
- const int request_len,
- void **proc_id,
- int *proc_id_len,
- void **reply,
- int *reply_len,
- const int timeout_ms);
+int BHAsyncRequest(const void *request,
+ const int request_len,
+ void **msg_id,
+ int *msg_id_len);
-bool BHReadRequest(void **proc_id,
- int *proc_id_len,
- void **request,
- int *request_len,
- BHSrcInfo **src,
- const int timeout_ms);
+int BHRequest(const void *request,
+ const int request_len,
+ void **proc_id,
+ int *proc_id_len,
+ void **reply,
+ int *reply_len,
+ const int timeout_ms);
-bool BHSendReply(BHSrcInfo *src,
- const void *reply,
- const int reply_len);
+int BHReadRequest(void **proc_id,
+ int *proc_id_len,
+ void **request,
+ int *request_len,
+ void **src,
+ const int timeout_ms);
+
+int BHSendReply(void *src,
+ const void *reply,
+ const int reply_len);
// int BHCleanUp();
void BHFree(void *buf, int size);
-int BHGetLastError(void **msg, int &msg_len);
+int BHGetLastError(void **msg, int *msg_len);
#ifdef __cplusplus
}
diff --git a/src/defs.cpp b/src/defs.cpp
index 77b0722..0ff671b 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -16,6 +16,8 @@
* =====================================================================================
*/
#include "defs.h"
+#include "shm.h"
+
namespace
{
@@ -40,6 +42,12 @@
const MQId &BHTopicCenterAddress() { return kBHTopicCenter; }
const MQId &BHUniCenterAddress() { return kBHUniCenter; }
+bhome_shm::SharedMemory &BHomeShm()
+{
+ static bhome_shm::SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512);
+ return shm;
+}
+
void SetLastError(const int ec, const std::string &msg)
{
LastErrorStore().ec_ = ec;
diff --git a/src/msg.h b/src/msg.h
index c239956..e6b0b34 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -29,7 +29,6 @@
namespace bhome_msg
{
using namespace bhome_shm;
-using namespace bhome::msg; // for serialized data in MsgI
// MsgI is safe to be stored in shared memory, so POD data or offset_ptr is required.
// message format: header(meta) + body(data).
diff --git a/src/proto.h b/src/proto.h
index b418342..2557f8e 100644
--- a/src/proto.h
+++ b/src/proto.h
@@ -22,16 +22,16 @@
#include "bhome_msg_api.pb.h"
#include <chrono>
-using namespace bhome::msg;
+using namespace bhome_msg;
template <class Msg>
struct MsgToType {
};
-#define BHOME_MAP_MSG_AND_TYPE(mSG, tYPE) \
- template <> \
- struct MsgToType<mSG> { \
- static const bhome::msg::MsgType value = tYPE; \
+#define BHOME_MAP_MSG_AND_TYPE(mSG, tYPE) \
+ template <> \
+ struct MsgToType<mSG> { \
+ static const MsgType value = tYPE; \
};
#define BHOME_SIMPLE_MAP_MSG(name) BHOME_MAP_MSG_AND_TYPE(Msg##name, kMsgType##name)
@@ -52,7 +52,7 @@
#undef BHOME_MAP_MSG_AND_TYPE
template <class Msg>
-constexpr inline bhome::msg::MsgType GetType(const Msg &)
+constexpr inline MsgType GetType(const Msg &)
{
return MsgToType<Msg>::value;
}
diff --git a/src/socket.cpp b/src/socket.cpp
index 0ba195a..aec42b4 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -115,7 +115,7 @@
}
//maybe reimplment, using async cbs?
-bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms)
+bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms)
{
// std::lock_guard<std::mutex> lock(mutex_); // seems no need to lock mutex_.
bool got = (timeout_ms == 0) ? mq().TryRecv(msg) : mq().Recv(msg, timeout_ms);
diff --git a/src/socket.h b/src/socket.h
index db64b36..493aeb4 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -66,7 +66,7 @@
size_t Pending() const { return mq().Pending(); }
template <class Body>
- bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb = RecvCB())
+ bool Send(const void *valid_remote, BHMsgHead &head, Body &body, const RecvCB &cb = RecvCB())
{
try {
if (!cb) {
@@ -91,10 +91,10 @@
return SendImpl(valid_remote, imsg);
}
- bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms);
+ bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
template <class Body>
- bool SendAndRecv(const void *remote, const BHMsgHead &head, const Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
+ bool SendAndRecv(const void *remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
{
struct State {
std::mutex mutex;
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 2cc5483..9853f35 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -361,7 +361,7 @@
return false;
}
-bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
+bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms)
{
if (!IsRegistered()) {
SetLastError(eNotRegistered, "Not Registered.");
diff --git a/src/topic_node.h b/src/topic_node.h
index 5a3b86e..87ad770 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -69,10 +69,10 @@
void Stop();
private:
- bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
+ bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms);
const std::string &proc_id() { return info_.proc_id(); }
- typedef bhome_msg::BHAddress Address;
+ typedef BHAddress Address;
class TopicQueryCache
{
class Impl
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index da51044..b2c00a4 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -19,7 +19,7 @@
#include "util.h"
#include <atomic>
-using namespace bhome::msg;
+using namespace bhome_msg;
namespace
{
@@ -66,7 +66,7 @@
const int proc_id_len,
const void *data,
const int data_len,
- BHServerCallbackTag *tag)
+ const void *tag)
{
// printf("ServerProc: ");
// DEFER1(printf("\n"););
@@ -138,19 +138,6 @@
}
}
-namespace
-{
-struct CCC {
-};
-void F(CCC &&c) {}
-
-template <class... T>
-void Pass(T &&...t)
-{
- F(std::forward<decltype(t)>(t)...);
-}
-
-} // namespace
BOOST_AUTO_TEST_CASE(ApiTest)
{
auto max_time = std::chrono::steady_clock::time_point::max();
@@ -266,7 +253,7 @@
auto hb = [](std::atomic<bool> *run) {
while (*run) {
Sleep(1s, false);
- bool r = BHHeartBeatEasy(1000);
+ bool r = BHHeartbeatEasy(1000);
printf("heartbeat: %s\n", r ? "ok" : "failed");
}
};
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 12d4396..572d8e5 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -198,7 +198,7 @@
{
const std::string shm_name("ShmReqRep");
ShmRemover auto_remove(shm_name);
- SharedMemory shm(shm_name, 1024 * 1024 * 50);
+ SharedMemory shm(shm_name, 1024 * 1024 * 512);
auto Avail = [&]() { return shm.get_free_memory(); };
auto init_avail = Avail();
@@ -224,22 +224,24 @@
printf("count: %d\n", count.load());
}
};
+ MsgRequestTopic req;
+ req.set_topic(topic);
+ req.set_data("data " + std::string(100, 'a'));
client.ClientStartWorker(onRecv, 2);
boost::timer::auto_cpu_timer timer;
for (int i = 0; i < nreq; ++i) {
- MsgRequestTopic req;
- req.set_topic(topic);
- req.set_data("data " + std::to_string(i));
std::string msg_id;
if (!client.ClientAsyncRequest(req, msg_id)) {
printf("client request failed\n");
++count;
}
- // if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) {
+ // std::string proc_id;
+ // MsgRequestTopicReply reply;
+ // if (!client.ClientSyncRequest(req, proc_id, reply, 1000)) {
// printf("client request failed\n");
// }
- // ++count;
+ // ++count;
}
do {
std::this_thread::yield();
@@ -278,7 +280,7 @@
servers.Launch(Server, "server", topics);
Sleep(100ms);
for (auto &t : topics) {
- clients.Launch(Client, t, 1000 * 100);
+ clients.Launch(Client, t, 1000 * 100 * 2);
}
clients.WaitAll();
printf("clients done, server replyed: %ld\n", server_msg_count.load());
--
Gitblit v1.8.0