From ca319178f45ce6256aed7913565d445571f6db22 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 20 四月 2021 11:04:07 +0800
Subject: [PATCH] add go api, wrap C api, not finished.

---
 api/go/bhome_node.go             |  214 +++++++++++++++++++++
 api/go/go.mod                    |    5 
 src/socket.h                     |    6 
 .gitignore                       |    3 
 box/center.cpp                   |   11 
 src/proto.h                      |   12 
 src/msg.h                        |    1 
 src/socket.cpp                   |    2 
 proto/source/bhome_msg.proto     |   40 ++-
 utest/utest.cpp                  |   16 
 proto/source/error_msg.proto     |    3 
 src/topic_node.cpp               |    2 
 src/defs.cpp                     |    8 
 proto/source/bhome_msg_api.proto |    3 
 utest/api_test.cpp               |   19 -
 box/status_main.cc               |    4 
 api/go/bh_api.h                  |    1 
 src/topic_node.h                 |    4 
 src/bh_api.h                     |  119 ++++++-----
 api/go/go.sum                    |   31 +++
 src/bh_api.cpp                   |   92 ++++----
 21 files changed, 431 insertions(+), 165 deletions(-)

diff --git a/.gitignore b/.gitignore
index 6d80560..4e2c214 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,4 +9,5 @@
 */bhshmq_center
 */help
 */bhshmqbox
-*/bhshmq_status
\ No newline at end of file
+*/bhshmq_status
+api/go/bhome_msg/*.pb.go
diff --git a/api/go/bh_api.h b/api/go/bh_api.h
new file mode 120000
index 0000000..627e3b4
--- /dev/null
+++ b/api/go/bh_api.h
@@ -0,0 +1 @@
+../../src/bh_api.h
\ No newline at end of file
diff --git a/api/go/bhome_node.go b/api/go/bhome_node.go
new file mode 100644
index 0000000..c950750
--- /dev/null
+++ b/api/go/bhome_node.go
@@ -0,0 +1,214 @@
+package main
+
+/*
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdint.h>
+#include "bh_api.h"
+
+void print(int v)
+{
+	printf("print %d\n", v);
+}
+
+static void ReadData(void **p, int32_t *n)
+{
+	*n = 4;
+	*p = malloc(4);
+	memcpy(*p, "abc", 4);
+	*n = 4;
+}
+static void PrintData(void *p, int32_t n)
+{
+	printf("data :%s\n", (char*)p);
+	free(p);
+}
+
+*/
+// #cgo LDFLAGS: -L/home/lichao/code/shmsg/build/lib -L/usr/local/lib -lbhome_shmq -lbhome_msg -lprotobuf-lite -lstdc++ -lpthread -lrt
+import "C"
+
+import (
+	bh "bhome/bhome_msg"
+	"fmt"
+	"unsafe"
+)
+
+func BHApiIn1Out1(bhfunc C.FBHApiIn1Out1, data []byte, reply *bh.MsgCommonReply, timeout_ms int) bool {
+	creply := unsafe.Pointer(nil)
+	creply_len := C.int(0)
+	defer C.BHFree(creply, creply_len)
+	r := C.BHApiIn1Out1Proxy(bhfunc, unsafe.Pointer(&data[0]), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0
+	if r {
+		reply.Unmarshal(C.GoBytes(creply, creply_len))
+	}
+	return r
+
+}
+
+func Register(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool {
+	data, _ := proc.Marshal()
+	return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHRegister), data, reply, timeout_ms)
+}
+
+func RegisterTopics(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool {
+	data, _ := topics.Marshal()
+	return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHRegisterTopics), data, reply, timeout_ms)
+}
+
+func Subscribe(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool {
+	data, _ := topics.Marshal()
+	return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeTopics), data, reply, timeout_ms)
+}
+
+func Heartbeat(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool {
+	data, _ := topics.Marshal()
+	return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHHeartbeat), data, reply, timeout_ms)
+}
+
+func HeartbeatEasy(timeout_ms int) bool {
+	return C.BHHeartbeatEasy(C.int(timeout_ms)) > 0
+}
+
+func Publish(pub *bh.MsgPublish, timeout_ms int) bool {
+	data, _ := pub.Marshal()
+	return C.BHPublish(unsafe.Pointer(&data[0]), C.int(len(data)), C.int(timeout_ms)) > 0
+}
+
+func ReadSub(proc_id *string, pub *bh.MsgPublish, timeout_ms int) bool {
+	cpid := unsafe.Pointer(nil)
+	cpid_len := C.int(0)
+	defer C.BHFree(cpid, cpid_len)
+	creply := unsafe.Pointer(nil)
+	creply_len := C.int(0)
+	defer C.BHFree(creply, creply_len)
+
+	r := C.BHReadSub(&cpid, &cpid_len, &creply, &creply_len, C.int(timeout_ms)) > 0
+	*proc_id = string(C.GoBytes(cpid, cpid_len))
+	pub.Unmarshal(C.GoBytes(creply, creply_len))
+	return r
+}
+
+func AsyncRequest(req *bh.MsgRequestTopic, msg_id *[]byte) bool {
+	data, _ := req.Marshal()
+	creply := unsafe.Pointer(nil)
+	creply_len := C.int(0)
+	defer C.BHFree(creply, creply_len)
+	r := C.BHAsyncRequest(unsafe.Pointer(&data[0]), C.int(len(data)), &creply, &creply_len) > 0
+	if r {
+		*msg_id = C.GoBytes(creply, creply_len)
+	}
+	return r
+}
+
+func Request(req *bh.MsgRequestTopic, proc_id *string, reply *bh.MsgRequestTopicReply, timeout_ms int) bool {
+	data, _ := req.Marshal()
+	cpid := unsafe.Pointer(nil)
+	cpid_len := C.int(0)
+	defer C.BHFree(cpid, cpid_len)
+	creply := unsafe.Pointer(nil)
+	creply_len := C.int(0)
+	defer C.BHFree(creply, creply_len)
+	r := C.BHRequest(unsafe.Pointer(&data[0]), C.int(len(data)), &cpid, &cpid_len, &creply, &creply_len, C.int(timeout_ms)) > 0
+	if r {
+		*proc_id = string(C.GoBytes(cpid, cpid_len))
+		reply.Unmarshal(C.GoBytes(creply, creply_len))
+	}
+	return r
+}
+
+func ReadRequest(proc_id *string, req *bh.MsgRequestTopic, psrc *unsafe.Pointer, timeout_ms int) bool {
+	cpid := unsafe.Pointer(nil)
+	cpid_len := C.int(0)
+	defer C.BHFree(cpid, cpid_len)
+	creply := unsafe.Pointer(nil)
+	creply_len := C.int(0)
+	defer C.BHFree(creply, creply_len)
+	r := C.BHReadRequest(&cpid, &cpid_len, &creply, &creply_len, psrc, C.int(timeout_ms)) > 0
+	if r {
+		*proc_id = string(C.GoBytes(cpid, cpid_len))
+		req.Unmarshal(C.GoBytes(creply, creply_len))
+	}
+	return r
+}
+
+func SendReply(src unsafe.Pointer, rep *bh.MsgRequestTopicReply) bool {
+	data, _ := rep.Marshal()
+	return C.BHSendReply(src, unsafe.Pointer(&data[0]), C.int(len(data))) > 0
+}
+
+func GetLastError() (int, string) {
+	creply := unsafe.Pointer(nil)
+	creply_len := C.int(0)
+	defer C.BHFree(creply, creply_len)
+	r := C.BHGetLastError(&creply, &creply_len)
+	return int(r), string(C.GoBytes(creply, creply_len))
+
+}
+
+func ServerCallbackReply(tag unsafe.Pointer, rep *bh.MsgRequestTopicReply) bool {
+	data, _ := rep.Marshal()
+	return C.BHServerCallbackReply(tag, unsafe.Pointer(&data[0]), C.int(len(data))) > 0
+}
+
+type ServecCB func(proc_id *string, req *bh.MsgRequestTopic, reply *bh.MsgRequestTopicReply) bool
+type SubDataCB func(proc_id *string, pub *bh.MsgPublish)
+type ClientCB func(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply)
+
+func cserver_callback(cpid *unsafe.Pointer, cpid_len unsafe.Pointer) {
+
+}
+func StartWorker(server_cb ServecCB, sub_cb SubDataCB, client_cb ClientCB) {
+
+}
+
+/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// user code:
+
+func ServerCallback(proc_id *string, req *bh.MsgRequestTopic, reply *bh.MsgRequestTopicReply) bool {
+	// xxxx
+	return true
+}
+
+func SubDataCallback(proc_id *string, pub *bh.MsgPublish) {
+
+}
+func ClientCallback(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply) {
+
+}
+
+func main() {
+	proc := bh.ProcInfo{}
+	proc.ProcId = []byte("test_proc")
+	reply := bh.MsgCommonReply{}
+
+	r := Register(&proc, &reply, 1000)
+	if r {
+		fmt.Println("register ok")
+	} else {
+		fmt.Println("register failed")
+		return
+	}
+
+	r = HeartbeatEasy(1000)
+	if r {
+		fmt.Println("heartbeat ok")
+	} else {
+		fmt.Println("heartbeat failed")
+	}
+
+	topics := bh.MsgTopicList{}
+	topics.TopicList = append(topics.TopicList, []byte("topic0"), []byte("topic1"))
+	RegisterTopics(&topics, &reply, 0)
+	if r {
+		fmt.Println("reg topics ok")
+	} else {
+		fmt.Println("reg topics failed")
+	}
+
+	p := unsafe.Pointer(nil)
+	n := C.int32_t(0)
+	C.ReadData(&p, &n)
+	C.PrintData(p, n)
+}
diff --git a/api/go/go.mod b/api/go/go.mod
new file mode 100644
index 0000000..7a12c30
--- /dev/null
+++ b/api/go/go.mod
@@ -0,0 +1,5 @@
+module bhome
+
+go 1.16
+
+require github.com/gogo/protobuf v1.3.2 // indirect
diff --git a/api/go/go.sum b/api/go/go.sum
new file mode 100644
index 0000000..faf43b7
--- /dev/null
+++ b/api/go/go.sum
@@ -0,0 +1,31 @@
+github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
+github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
+github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
+github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
+golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
diff --git a/box/center.cpp b/box/center.cpp
index e8d014e..3059e90 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -27,7 +27,6 @@
 
 using namespace bhome_shm;
 using namespace bhome_msg;
-using namespace bhome::msg;
 typedef BHCenter::MsgHandler Handler;
 
 namespace
@@ -39,7 +38,7 @@
 public:
 	typedef std::string ProcId;
 	typedef std::string Address;
-	typedef bhome::msg::ProcInfo ProcInfo;
+	typedef bhome_msg::ProcInfo ProcInfo;
 	typedef std::function<void(Address const &)> Cleaner;
 
 private:
@@ -396,7 +395,7 @@
 
 Handler Combine(const Handler &h1, const Handler &h2)
 {
-	return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head) {
+	return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head) {
 		return h1(socket, msg, head) || h2(socket, msg, head);
 	};
 }
@@ -490,12 +489,6 @@
 #undef CASE_ON_MSG_TYPE
 
 } // namespace
-
-SharedMemory &BHomeShm()
-{
-	static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512);
-	return shm;
-}
 
 BHCenter::CenterRecords &BHCenter::Centers()
 {
diff --git a/box/status_main.cc b/box/status_main.cc
index 993fbee..3f075fb 100644
--- a/box/status_main.cc
+++ b/box/status_main.cc
@@ -41,7 +41,7 @@
 	auto showStatus = [&]() {
 		auto next = Now();
 		const uint64_t start = ToMs(next);
-		auto last = 0;
+		auto last = 0ul;
 		while (run) {
 			std::this_thread::sleep_until(next);
 			auto passed = ToMs(next) - start;
@@ -70,7 +70,7 @@
 
 			char buf[200] = "\n";
 			auto Print = [&](bool new_line) {
-				int n = sprintf(buf, "\r%6ds avail : %12ld = %s %6ds", sec, cur, Pretty(cur).c_str() + 1, sec);
+				int n = sprintf(buf, "\r%6lds avail : %12ld = %s %6lds", sec, cur, Pretty(cur).c_str() + 1, sec);
 				printf("%s", buf);
 				if (new_line) {
 					auto diff = cur - last;
diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index 11ff5a2..aabe372 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -1,11 +1,12 @@
 syntax = "proto3";
 option optimize_for = LITE_RUNTIME;
+option go_package="./bhome_msg";
 
 // import "google/protobuf/descriptor.proto";
 import "bhome_msg_api.proto";
 import "error_msg.proto";
 
-package bhome.msg;
+package bhome_msg;
 
 
 // message format : head_len(4) + head(BHMsgHead) + body_len(4) + body(variable types)
@@ -19,22 +20,6 @@
 	bytes topic = 6; // for request route
 }
 
-message MsgRequest {
-	MsgType type = 1;
-	// oneof body;
-}
-
-message MsgReply {
-	ErrorMsg err_msg = 1;
-	// oneof reply
-}
-
-message BHMsgBody {
-	oneof reqrep {
-		MsgRequest request = 1;
-		MsgReply reply = 2;
-	}
-}
 
 enum MsgType {
 	kMsgTypeInvalid = 0;
@@ -76,3 +61,24 @@
 	rpc Query (MsgQueryTopic) returns (MsgQueryTopicReply);
 	rpc Request (MsgRequestTopic) returns (MsgQueryTopicReply);
 }
+
+message MsgRequest {
+	// oneof body;
+	oneof request {
+		MsgRegister register = 1;
+		MsgRequestTopic topic_request = 2;
+		MsgQueryTopic topic_query = 3;
+	}
+}
+
+message MsgReply {
+	ErrorMsg err_msg = 1;
+	// oneof reply
+}
+
+message BHMsgBody {
+	oneof reqrep {
+		MsgRequest request = 1;
+		MsgReply reply = 2;
+	}
+}
diff --git a/proto/source/bhome_msg_api.proto b/proto/source/bhome_msg_api.proto
index a8e8545..1c7cc1c 100644
--- a/proto/source/bhome_msg_api.proto
+++ b/proto/source/bhome_msg_api.proto
@@ -1,10 +1,11 @@
 syntax = "proto3";
 option optimize_for = LITE_RUNTIME;
+option go_package="./bhome_msg";
 
 // public messages
 import "error_msg.proto";
 
-package bhome.msg;
+package bhome_msg;
 
 message BHAddress {
 	bytes mq_id = 1; // mqid, uuid
diff --git a/proto/source/error_msg.proto b/proto/source/error_msg.proto
index b85ddb3..6496c67 100644
--- a/proto/source/error_msg.proto
+++ b/proto/source/error_msg.proto
@@ -1,8 +1,9 @@
 syntax = "proto3";
 
 option optimize_for = LITE_RUNTIME;
+option go_package="./bhome_msg";
 
-package bhome.msg;
+package bhome_msg;
 
 enum ErrorCode {
     eSuccess = 0;
diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index 2abe66d..3844000 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -83,44 +83,50 @@
 		return false;
 	}
 	MsgOut msg_reply;
-	if ((ProcNode().*mfunc)(input, msg_reply, timeout_ms)) {
-		return PackOutput(msg_reply, reply, reply_len);
-
-	} else {
-		return false;
-	}
+	return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) &&
+	       PackOutput(msg_reply, reply, reply_len);
 }
 
 } // namespace
 
-bool BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
+int BHApiIn1Out1Proxy(FBHApiIn1Out1 func,
+                      const void *request,
+                      const int request_len,
+                      void **reply,
+                      int *reply_len,
+                      const int timeout_ms)
+{
+	return (*func)(request, request_len, reply, reply_len, timeout_ms);
+}
+
+int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
 {
 	return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
 }
 
-bool BHHeartBeatEasy(const int timeout_ms)
+int BHHeartbeatEasy(const int timeout_ms)
 {
 	return ProcNode().Heartbeat(timeout_ms);
 }
 
-bool BHHeartBeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
+int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
 {
 	return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
 }
 
-bool BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
 {
 	return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
 }
 
-bool BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
 {
 	return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
 }
 
-bool BHPublish(const void *msgpub,
-               const int msgpub_len,
-               const int timeout_ms)
+int BHPublish(const void *msgpub,
+              const int msgpub_len,
+              const int timeout_ms)
 {
 	MsgPublish pub;
 	if (!pub.ParseFromArray(msgpub, msgpub_len)) {
@@ -130,11 +136,11 @@
 	return ProcNode().Publish(pub, timeout_ms);
 }
 
-bool BHReadSub(void **proc_id,
-               int *proc_id_len,
-               void **msgpub,
-               int *msgpub_len,
-               const int timeout_ms)
+int BHReadSub(void **proc_id,
+              int *proc_id_len,
+              void **msgpub,
+              int *msgpub_len,
+              const int timeout_ms)
 {
 	std::string proc;
 	MsgPublish pub;
@@ -151,10 +157,10 @@
 	return false;
 }
 
-bool BHAsyncRequest(const void *request,
-                    const int request_len,
-                    void **msg_id,
-                    int *msg_id_len)
+int BHAsyncRequest(const void *request,
+                   const int request_len,
+                   void **msg_id,
+                   int *msg_id_len)
 {
 	MsgRequestTopic req;
 	if (!req.ParseFromArray(request, request_len)) {
@@ -178,13 +184,13 @@
 	return false;
 }
 
-bool BHRequest(const void *request,
-               const int request_len,
-               void **proc_id,
-               int *proc_id_len,
-               void **reply,
-               int *reply_len,
-               const int timeout_ms)
+int BHRequest(const void *request,
+              const int request_len,
+              void **proc_id,
+              int *proc_id_len,
+              void **reply,
+              int *reply_len,
+              const int timeout_ms)
 {
 	MsgRequestTopic req;
 	if (!req.ParseFromArray(request, request_len)) {
@@ -205,12 +211,12 @@
 	return false;
 }
 
-bool BHReadRequest(void **proc_id,
-                   int *proc_id_len,
-                   void **request,
-                   int *request_len,
-                   void **src,
-                   const int timeout_ms)
+int BHReadRequest(void **proc_id,
+                  int *proc_id_len,
+                  void **request,
+                  int *request_len,
+                  void **src,
+                  const int timeout_ms)
 {
 	void *src_info = 0;
 	std::string proc;
@@ -228,9 +234,9 @@
 	return false;
 }
 
-bool BHSendReply(void *src,
-                 const void *reply,
-                 const int reply_len)
+int BHSendReply(void *src,
+                const void *reply,
+                const int reply_len)
 {
 	MsgRequestTopicReply rep;
 	if (!rep.ParseFromArray(reply, reply_len)) {
@@ -263,7 +269,7 @@
 				r = reply.ParseFromArray(p, len);
 				return r;
 			};
-			server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), (BHServerCallbackTag *) (&sender));
+			server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), &sender);
 			return r;
 		};
 	}
@@ -284,9 +290,9 @@
 
 	ProcNode().Start(on_req, on_sub, on_reply);
 }
-bool BHServerCallbackReply(const BHServerCallbackTag *tag,
-                           const void *data,
-                           const int data_len)
+int BHServerCallbackReply(const void *tag,
+                          const void *data,
+                          const int data_len)
 {
 	auto &sender = *(const ServerSender *) (tag);
 	return sender(data, data_len);
diff --git a/src/bh_api.h b/src/bh_api.h
index eeb47a5..39b4cc6 100644
--- a/src/bh_api.h
+++ b/src/bh_api.h
@@ -5,26 +5,36 @@
 extern "C" {
 #endif
 
-struct BHSrcInfo;
-struct BHServerCallbackTag;
+typedef int (*FBHApiIn1Out1)(const void *proc_info,
+                             const int proc_info_len,
+                             void **reply,
+                             int *reply_len,
+                             const int timeout_ms);
 
-bool BHRegister(const void *proc_info,
-                const int proc_info_len,
-                void **reply,
-                int *reply_len,
-                const int timeout_ms);
-
-bool BHRegisterTopics(const void *topics,
-                      const int topics_len,
+int BHApiIn1Out1Proxy(FBHApiIn1Out1 func,
+                      const void *request,
+                      const int request_len,
                       void **reply,
                       int *reply_len,
                       const int timeout_ms);
 
-bool BHSubscribeTopics(const void *topics,
-                       const int topics_len,
-                       void **reply,
-                       int *reply_len,
-                       const int timeout_ms);
+int BHRegister(const void *proc_info,
+               const int proc_info_len,
+               void **reply,
+               int *reply_len,
+               const int timeout_ms);
+
+int BHRegisterTopics(const void *topics,
+                     const int topics_len,
+                     void **reply,
+                     int *reply_len,
+                     const int timeout_ms);
+
+int BHSubscribeTopics(const void *topics,
+                      const int topics_len,
+                      void **reply,
+                      int *reply_len,
+                      const int timeout_ms);
 
 typedef void (*FSubDataCallback)(const void *proc_id,
                                  const int proc_id_len,
@@ -35,7 +45,7 @@
                                 const int proc_id_len,
                                 const void *data,
                                 const int data_len,
-                                BHServerCallbackTag *tag);
+                                const void *tag);
 
 typedef void (*FClientCallback)(const void *proc_id,
                                 const int proc_id_len,
@@ -45,56 +55,57 @@
                                 const int data_len);
 
 void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb);
-bool BHServerCallbackReply(const BHServerCallbackTag *tag,
-                           const void *data,
-                           const int data_len);
 
-bool BHHeartBeatEasy(const int timeout_ms);
-bool BHHeartBeat(const void *proc_info,
-                 const int proc_info_len,
-                 void **reply,
-                 int *reply_len,
-                 const int timeout_ms);
+int BHServerCallbackReply(const void *tag,
+                          const void *data,
+                          const int data_len);
 
-bool BHPublish(const void *msgpub,
-               const int msgpub_len,
-               const int timeout_ms);
+int BHHeartbeatEasy(const int timeout_ms);
+int BHHeartbeat(const void *proc_info,
+                const int proc_info_len,
+                void **reply,
+                int *reply_len,
+                const int timeout_ms);
 
-bool BHReadSub(const void *proc_id,
-               const int proc_id_len,
-               void **msgpub,
-               int *msgpub_len,
-               const int timeout_ms);
+int BHPublish(const void *msgpub,
+              const int msgpub_len,
+              const int timeout_ms);
 
-bool BHAsyncRequest(const void *request,
-                    const int request_len,
-                    void **msg_id,
-                    int *msg_id_len);
+int BHReadSub(void **proc_id,
+              int *proc_id_len,
+              void **msgpub,
+              int *msgpub_len,
+              const int timeout_ms);
 
-bool BHRequest(const void *request,
-               const int request_len,
-               void **proc_id,
-               int *proc_id_len,
-               void **reply,
-               int *reply_len,
-               const int timeout_ms);
+int BHAsyncRequest(const void *request,
+                   const int request_len,
+                   void **msg_id,
+                   int *msg_id_len);
 
-bool BHReadRequest(void **proc_id,
-                   int *proc_id_len,
-                   void **request,
-                   int *request_len,
-                   BHSrcInfo **src,
-                   const int timeout_ms);
+int BHRequest(const void *request,
+              const int request_len,
+              void **proc_id,
+              int *proc_id_len,
+              void **reply,
+              int *reply_len,
+              const int timeout_ms);
 
-bool BHSendReply(BHSrcInfo *src,
-                 const void *reply,
-                 const int reply_len);
+int BHReadRequest(void **proc_id,
+                  int *proc_id_len,
+                  void **request,
+                  int *request_len,
+                  void **src,
+                  const int timeout_ms);
+
+int BHSendReply(void *src,
+                const void *reply,
+                const int reply_len);
 
 // int BHCleanUp();
 
 void BHFree(void *buf, int size);
 
-int BHGetLastError(void **msg, int &msg_len);
+int BHGetLastError(void **msg, int *msg_len);
 
 #ifdef __cplusplus
 }
diff --git a/src/defs.cpp b/src/defs.cpp
index 77b0722..0ff671b 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -16,6 +16,8 @@
  * =====================================================================================
  */
 #include "defs.h"
+#include "shm.h"
+
 namespace
 {
 
@@ -40,6 +42,12 @@
 const MQId &BHTopicCenterAddress() { return kBHTopicCenter; }
 const MQId &BHUniCenterAddress() { return kBHUniCenter; }
 
+bhome_shm::SharedMemory &BHomeShm()
+{
+	static bhome_shm::SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512);
+	return shm;
+}
+
 void SetLastError(const int ec, const std::string &msg)
 {
 	LastErrorStore().ec_ = ec;
diff --git a/src/msg.h b/src/msg.h
index c239956..e6b0b34 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -29,7 +29,6 @@
 namespace bhome_msg
 {
 using namespace bhome_shm;
-using namespace bhome::msg; // for serialized data in MsgI
 
 // MsgI is safe to be stored in shared memory, so POD data or offset_ptr is required.
 // message format: header(meta) + body(data).
diff --git a/src/proto.h b/src/proto.h
index b418342..2557f8e 100644
--- a/src/proto.h
+++ b/src/proto.h
@@ -22,16 +22,16 @@
 #include "bhome_msg_api.pb.h"
 #include <chrono>
 
-using namespace bhome::msg;
+using namespace bhome_msg;
 
 template <class Msg>
 struct MsgToType {
 };
 
-#define BHOME_MAP_MSG_AND_TYPE(mSG, tYPE)              \
-	template <>                                        \
-	struct MsgToType<mSG> {                            \
-		static const bhome::msg::MsgType value = tYPE; \
+#define BHOME_MAP_MSG_AND_TYPE(mSG, tYPE)  \
+	template <>                            \
+	struct MsgToType<mSG> {                \
+		static const MsgType value = tYPE; \
 	};
 
 #define BHOME_SIMPLE_MAP_MSG(name) BHOME_MAP_MSG_AND_TYPE(Msg##name, kMsgType##name)
@@ -52,7 +52,7 @@
 #undef BHOME_MAP_MSG_AND_TYPE
 
 template <class Msg>
-constexpr inline bhome::msg::MsgType GetType(const Msg &)
+constexpr inline MsgType GetType(const Msg &)
 {
 	return MsgToType<Msg>::value;
 }
diff --git a/src/socket.cpp b/src/socket.cpp
index 0ba195a..aec42b4 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -115,7 +115,7 @@
 }
 
 //maybe reimplment, using async cbs?
-bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms)
+bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms)
 {
 	// std::lock_guard<std::mutex> lock(mutex_); // seems no need to lock mutex_.
 	bool got = (timeout_ms == 0) ? mq().TryRecv(msg) : mq().Recv(msg, timeout_ms);
diff --git a/src/socket.h b/src/socket.h
index db64b36..493aeb4 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -66,7 +66,7 @@
 	size_t Pending() const { return mq().Pending(); }
 
 	template <class Body>
-	bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb = RecvCB())
+	bool Send(const void *valid_remote, BHMsgHead &head, Body &body, const RecvCB &cb = RecvCB())
 	{
 		try {
 			if (!cb) {
@@ -91,10 +91,10 @@
 		return SendImpl(valid_remote, imsg);
 	}
 
-	bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms);
+	bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
 
 	template <class Body>
-	bool SendAndRecv(const void *remote, const BHMsgHead &head, const Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
+	bool SendAndRecv(const void *remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
 	{
 		struct State {
 			std::mutex mutex;
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 2cc5483..9853f35 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -361,7 +361,7 @@
 	return false;
 }
 
-bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
+bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms)
 {
 	if (!IsRegistered()) {
 		SetLastError(eNotRegistered, "Not Registered.");
diff --git a/src/topic_node.h b/src/topic_node.h
index 5a3b86e..87ad770 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -69,10 +69,10 @@
 	void Stop();
 
 private:
-	bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
+	bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms);
 	const std::string &proc_id() { return info_.proc_id(); }
 
-	typedef bhome_msg::BHAddress Address;
+	typedef BHAddress Address;
 	class TopicQueryCache
 	{
 		class Impl
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index da51044..b2c00a4 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -19,7 +19,7 @@
 #include "util.h"
 #include <atomic>
 
-using namespace bhome::msg;
+using namespace bhome_msg;
 
 namespace
 {
@@ -66,7 +66,7 @@
                 const int proc_id_len,
                 const void *data,
                 const int data_len,
-                BHServerCallbackTag *tag)
+                const void *tag)
 {
 	// printf("ServerProc: ");
 	// DEFER1(printf("\n"););
@@ -138,19 +138,6 @@
 	}
 }
 
-namespace
-{
-struct CCC {
-};
-void F(CCC &&c) {}
-
-template <class... T>
-void Pass(T &&...t)
-{
-	F(std::forward<decltype(t)>(t)...);
-}
-
-} // namespace
 BOOST_AUTO_TEST_CASE(ApiTest)
 {
 	auto max_time = std::chrono::steady_clock::time_point::max();
@@ -266,7 +253,7 @@
 	auto hb = [](std::atomic<bool> *run) {
 		while (*run) {
 			Sleep(1s, false);
-			bool r = BHHeartBeatEasy(1000);
+			bool r = BHHeartbeatEasy(1000);
 			printf("heartbeat: %s\n", r ? "ok" : "failed");
 		}
 	};
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 12d4396..572d8e5 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -198,7 +198,7 @@
 {
 	const std::string shm_name("ShmReqRep");
 	ShmRemover auto_remove(shm_name);
-	SharedMemory shm(shm_name, 1024 * 1024 * 50);
+	SharedMemory shm(shm_name, 1024 * 1024 * 512);
 
 	auto Avail = [&]() { return shm.get_free_memory(); };
 	auto init_avail = Avail();
@@ -224,22 +224,24 @@
 				printf("count: %d\n", count.load());
 			}
 		};
+		MsgRequestTopic req;
+		req.set_topic(topic);
+		req.set_data("data " + std::string(100, 'a'));
 		client.ClientStartWorker(onRecv, 2);
 		boost::timer::auto_cpu_timer timer;
 		for (int i = 0; i < nreq; ++i) {
-			MsgRequestTopic req;
-			req.set_topic(topic);
-			req.set_data("data " + std::to_string(i));
 			std::string msg_id;
 			if (!client.ClientAsyncRequest(req, msg_id)) {
 				printf("client request failed\n");
 				++count;
 			}
 
-			// if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) {
+			// std::string proc_id;
+			// MsgRequestTopicReply reply;
+			// if (!client.ClientSyncRequest(req, proc_id, reply, 1000)) {
 			// 	printf("client request failed\n");
 			// }
-			// 	++count;
+			// ++count;
 		}
 		do {
 			std::this_thread::yield();
@@ -278,7 +280,7 @@
 	servers.Launch(Server, "server", topics);
 	Sleep(100ms);
 	for (auto &t : topics) {
-		clients.Launch(Client, t, 1000 * 100);
+		clients.Launch(Client, t, 1000 * 100 * 2);
 	}
 	clients.WaitAll();
 	printf("clients done, server replyed: %ld\n", server_msg_count.load());

--
Gitblit v1.8.0