lichao
2021-04-20 ca319178f45ce6256aed7913565d445571f6db22
add go api, wrap C api, not finished.
4个文件已添加
17个文件已修改
596 ■■■■ 已修改文件
.gitignore 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/go/bh_api.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/go/bhome_node.go 214 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/go/go.mod 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/go/go.sum 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/status_main.cc 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg.proto 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg_api.proto 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/error_msg.proto 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cpp 92 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.h 119 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.cpp 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/proto.h 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -9,4 +9,5 @@
*/bhshmq_center
*/help
*/bhshmqbox
*/bhshmq_status
*/bhshmq_status
api/go/bhome_msg/*.pb.go
api/go/bh_api.h
New file
@@ -0,0 +1 @@
../../src/bh_api.h
api/go/bhome_node.go
New file
@@ -0,0 +1,214 @@
package main
/*
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include "bh_api.h"
void print(int v)
{
    printf("print %d\n", v);
}
static void ReadData(void **p, int32_t *n)
{
    *n = 4;
    *p = malloc(4);
    memcpy(*p, "abc", 4);
    *n = 4;
}
static void PrintData(void *p, int32_t n)
{
    printf("data :%s\n", (char*)p);
    free(p);
}
*/
// #cgo LDFLAGS: -L/home/lichao/code/shmsg/build/lib -L/usr/local/lib -lbhome_shmq -lbhome_msg -lprotobuf-lite -lstdc++ -lpthread -lrt
import "C"
import (
    bh "bhome/bhome_msg"
    "fmt"
    "unsafe"
)
func BHApiIn1Out1(bhfunc C.FBHApiIn1Out1, data []byte, reply *bh.MsgCommonReply, timeout_ms int) bool {
    creply := unsafe.Pointer(nil)
    creply_len := C.int(0)
    defer C.BHFree(creply, creply_len)
    r := C.BHApiIn1Out1Proxy(bhfunc, unsafe.Pointer(&data[0]), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0
    if r {
        reply.Unmarshal(C.GoBytes(creply, creply_len))
    }
    return r
}
func Register(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool {
    data, _ := proc.Marshal()
    return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHRegister), data, reply, timeout_ms)
}
func RegisterTopics(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool {
    data, _ := topics.Marshal()
    return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHRegisterTopics), data, reply, timeout_ms)
}
func Subscribe(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool {
    data, _ := topics.Marshal()
    return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeTopics), data, reply, timeout_ms)
}
func Heartbeat(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool {
    data, _ := topics.Marshal()
    return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHHeartbeat), data, reply, timeout_ms)
}
func HeartbeatEasy(timeout_ms int) bool {
    return C.BHHeartbeatEasy(C.int(timeout_ms)) > 0
}
func Publish(pub *bh.MsgPublish, timeout_ms int) bool {
    data, _ := pub.Marshal()
    return C.BHPublish(unsafe.Pointer(&data[0]), C.int(len(data)), C.int(timeout_ms)) > 0
}
func ReadSub(proc_id *string, pub *bh.MsgPublish, timeout_ms int) bool {
    cpid := unsafe.Pointer(nil)
    cpid_len := C.int(0)
    defer C.BHFree(cpid, cpid_len)
    creply := unsafe.Pointer(nil)
    creply_len := C.int(0)
    defer C.BHFree(creply, creply_len)
    r := C.BHReadSub(&cpid, &cpid_len, &creply, &creply_len, C.int(timeout_ms)) > 0
    *proc_id = string(C.GoBytes(cpid, cpid_len))
    pub.Unmarshal(C.GoBytes(creply, creply_len))
    return r
}
func AsyncRequest(req *bh.MsgRequestTopic, msg_id *[]byte) bool {
    data, _ := req.Marshal()
    creply := unsafe.Pointer(nil)
    creply_len := C.int(0)
    defer C.BHFree(creply, creply_len)
    r := C.BHAsyncRequest(unsafe.Pointer(&data[0]), C.int(len(data)), &creply, &creply_len) > 0
    if r {
        *msg_id = C.GoBytes(creply, creply_len)
    }
    return r
}
func Request(req *bh.MsgRequestTopic, proc_id *string, reply *bh.MsgRequestTopicReply, timeout_ms int) bool {
    data, _ := req.Marshal()
    cpid := unsafe.Pointer(nil)
    cpid_len := C.int(0)
    defer C.BHFree(cpid, cpid_len)
    creply := unsafe.Pointer(nil)
    creply_len := C.int(0)
    defer C.BHFree(creply, creply_len)
    r := C.BHRequest(unsafe.Pointer(&data[0]), C.int(len(data)), &cpid, &cpid_len, &creply, &creply_len, C.int(timeout_ms)) > 0
    if r {
        *proc_id = string(C.GoBytes(cpid, cpid_len))
        reply.Unmarshal(C.GoBytes(creply, creply_len))
    }
    return r
}
func ReadRequest(proc_id *string, req *bh.MsgRequestTopic, psrc *unsafe.Pointer, timeout_ms int) bool {
    cpid := unsafe.Pointer(nil)
    cpid_len := C.int(0)
    defer C.BHFree(cpid, cpid_len)
    creply := unsafe.Pointer(nil)
    creply_len := C.int(0)
    defer C.BHFree(creply, creply_len)
    r := C.BHReadRequest(&cpid, &cpid_len, &creply, &creply_len, psrc, C.int(timeout_ms)) > 0
    if r {
        *proc_id = string(C.GoBytes(cpid, cpid_len))
        req.Unmarshal(C.GoBytes(creply, creply_len))
    }
    return r
}
func SendReply(src unsafe.Pointer, rep *bh.MsgRequestTopicReply) bool {
    data, _ := rep.Marshal()
    return C.BHSendReply(src, unsafe.Pointer(&data[0]), C.int(len(data))) > 0
}
func GetLastError() (int, string) {
    creply := unsafe.Pointer(nil)
    creply_len := C.int(0)
    defer C.BHFree(creply, creply_len)
    r := C.BHGetLastError(&creply, &creply_len)
    return int(r), string(C.GoBytes(creply, creply_len))
}
func ServerCallbackReply(tag unsafe.Pointer, rep *bh.MsgRequestTopicReply) bool {
    data, _ := rep.Marshal()
    return C.BHServerCallbackReply(tag, unsafe.Pointer(&data[0]), C.int(len(data))) > 0
}
type ServecCB func(proc_id *string, req *bh.MsgRequestTopic, reply *bh.MsgRequestTopicReply) bool
type SubDataCB func(proc_id *string, pub *bh.MsgPublish)
type ClientCB func(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply)
func cserver_callback(cpid *unsafe.Pointer, cpid_len unsafe.Pointer) {
}
func StartWorker(server_cb ServecCB, sub_cb SubDataCB, client_cb ClientCB) {
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// user code:
func ServerCallback(proc_id *string, req *bh.MsgRequestTopic, reply *bh.MsgRequestTopicReply) bool {
    // xxxx
    return true
}
func SubDataCallback(proc_id *string, pub *bh.MsgPublish) {
}
func ClientCallback(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply) {
}
func main() {
    proc := bh.ProcInfo{}
    proc.ProcId = []byte("test_proc")
    reply := bh.MsgCommonReply{}
    r := Register(&proc, &reply, 1000)
    if r {
        fmt.Println("register ok")
    } else {
        fmt.Println("register failed")
        return
    }
    r = HeartbeatEasy(1000)
    if r {
        fmt.Println("heartbeat ok")
    } else {
        fmt.Println("heartbeat failed")
    }
    topics := bh.MsgTopicList{}
    topics.TopicList = append(topics.TopicList, []byte("topic0"), []byte("topic1"))
    RegisterTopics(&topics, &reply, 0)
    if r {
        fmt.Println("reg topics ok")
    } else {
        fmt.Println("reg topics failed")
    }
    p := unsafe.Pointer(nil)
    n := C.int32_t(0)
    C.ReadData(&p, &n)
    C.PrintData(p, n)
}
api/go/go.mod
New file
@@ -0,0 +1,5 @@
module bhome
go 1.16
require github.com/gogo/protobuf v1.3.2 // indirect
api/go/go.sum
New file
@@ -0,0 +1,31 @@
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
box/center.cpp
@@ -27,7 +27,6 @@
using namespace bhome_shm;
using namespace bhome_msg;
using namespace bhome::msg;
typedef BHCenter::MsgHandler Handler;
namespace
@@ -39,7 +38,7 @@
public:
    typedef std::string ProcId;
    typedef std::string Address;
    typedef bhome::msg::ProcInfo ProcInfo;
    typedef bhome_msg::ProcInfo ProcInfo;
    typedef std::function<void(Address const &)> Cleaner;
private:
@@ -396,7 +395,7 @@
Handler Combine(const Handler &h1, const Handler &h2)
{
    return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head) {
    return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head) {
        return h1(socket, msg, head) || h2(socket, msg, head);
    };
}
@@ -490,12 +489,6 @@
#undef CASE_ON_MSG_TYPE
} // namespace
SharedMemory &BHomeShm()
{
    static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512);
    return shm;
}
BHCenter::CenterRecords &BHCenter::Centers()
{
box/status_main.cc
@@ -41,7 +41,7 @@
    auto showStatus = [&]() {
        auto next = Now();
        const uint64_t start = ToMs(next);
        auto last = 0;
        auto last = 0ul;
        while (run) {
            std::this_thread::sleep_until(next);
            auto passed = ToMs(next) - start;
@@ -70,7 +70,7 @@
            char buf[200] = "\n";
            auto Print = [&](bool new_line) {
                int n = sprintf(buf, "\r%6ds avail : %12ld = %s %6ds", sec, cur, Pretty(cur).c_str() + 1, sec);
                int n = sprintf(buf, "\r%6lds avail : %12ld = %s %6lds", sec, cur, Pretty(cur).c_str() + 1, sec);
                printf("%s", buf);
                if (new_line) {
                    auto diff = cur - last;
proto/source/bhome_msg.proto
@@ -1,11 +1,12 @@
syntax = "proto3";
option optimize_for = LITE_RUNTIME;
option go_package="./bhome_msg";
// import "google/protobuf/descriptor.proto";
import "bhome_msg_api.proto";
import "error_msg.proto";
package bhome.msg;
package bhome_msg;
// message format : head_len(4) + head(BHMsgHead) + body_len(4) + body(variable types)
@@ -19,22 +20,6 @@
    bytes topic = 6; // for request route
}
message MsgRequest {
    MsgType type = 1;
    // oneof body;
}
message MsgReply {
    ErrorMsg err_msg = 1;
    // oneof reply
}
message BHMsgBody {
    oneof reqrep {
        MsgRequest request = 1;
        MsgReply reply = 2;
    }
}
enum MsgType {
    kMsgTypeInvalid = 0;
@@ -76,3 +61,24 @@
    rpc Query (MsgQueryTopic) returns (MsgQueryTopicReply);
    rpc Request (MsgRequestTopic) returns (MsgQueryTopicReply);
}
message MsgRequest {
    // oneof body;
    oneof request {
        MsgRegister register = 1;
        MsgRequestTopic topic_request = 2;
        MsgQueryTopic topic_query = 3;
    }
}
message MsgReply {
    ErrorMsg err_msg = 1;
    // oneof reply
}
message BHMsgBody {
    oneof reqrep {
        MsgRequest request = 1;
        MsgReply reply = 2;
    }
}
proto/source/bhome_msg_api.proto
@@ -1,10 +1,11 @@
syntax = "proto3";
option optimize_for = LITE_RUNTIME;
option go_package="./bhome_msg";
// public messages
import "error_msg.proto";
package bhome.msg;
package bhome_msg;
message BHAddress {
    bytes mq_id = 1; // mqid, uuid
proto/source/error_msg.proto
@@ -1,8 +1,9 @@
syntax = "proto3";
option optimize_for = LITE_RUNTIME;
option go_package="./bhome_msg";
package bhome.msg;
package bhome_msg;
enum ErrorCode {
    eSuccess = 0;
src/bh_api.cpp
@@ -83,44 +83,50 @@
        return false;
    }
    MsgOut msg_reply;
    if ((ProcNode().*mfunc)(input, msg_reply, timeout_ms)) {
        return PackOutput(msg_reply, reply, reply_len);
    } else {
        return false;
    }
    return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) &&
           PackOutput(msg_reply, reply, reply_len);
}
} // namespace
bool BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
int BHApiIn1Out1Proxy(FBHApiIn1Out1 func,
                      const void *request,
                      const int request_len,
                      void **reply,
                      int *reply_len,
                      const int timeout_ms)
{
    return (*func)(request, request_len, reply, reply_len, timeout_ms);
}
int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
    return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
bool BHHeartBeatEasy(const int timeout_ms)
int BHHeartbeatEasy(const int timeout_ms)
{
    return ProcNode().Heartbeat(timeout_ms);
}
bool BHHeartBeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
    return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
bool BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
    return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
}
bool BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
    return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
}
bool BHPublish(const void *msgpub,
               const int msgpub_len,
               const int timeout_ms)
int BHPublish(const void *msgpub,
              const int msgpub_len,
              const int timeout_ms)
{
    MsgPublish pub;
    if (!pub.ParseFromArray(msgpub, msgpub_len)) {
@@ -130,11 +136,11 @@
    return ProcNode().Publish(pub, timeout_ms);
}
bool BHReadSub(void **proc_id,
               int *proc_id_len,
               void **msgpub,
               int *msgpub_len,
               const int timeout_ms)
int BHReadSub(void **proc_id,
              int *proc_id_len,
              void **msgpub,
              int *msgpub_len,
              const int timeout_ms)
{
    std::string proc;
    MsgPublish pub;
@@ -151,10 +157,10 @@
    return false;
}
bool BHAsyncRequest(const void *request,
                    const int request_len,
                    void **msg_id,
                    int *msg_id_len)
int BHAsyncRequest(const void *request,
                   const int request_len,
                   void **msg_id,
                   int *msg_id_len)
{
    MsgRequestTopic req;
    if (!req.ParseFromArray(request, request_len)) {
@@ -178,13 +184,13 @@
    return false;
}
bool BHRequest(const void *request,
               const int request_len,
               void **proc_id,
               int *proc_id_len,
               void **reply,
               int *reply_len,
               const int timeout_ms)
int BHRequest(const void *request,
              const int request_len,
              void **proc_id,
              int *proc_id_len,
              void **reply,
              int *reply_len,
              const int timeout_ms)
{
    MsgRequestTopic req;
    if (!req.ParseFromArray(request, request_len)) {
@@ -205,12 +211,12 @@
    return false;
}
bool BHReadRequest(void **proc_id,
                   int *proc_id_len,
                   void **request,
                   int *request_len,
                   void **src,
                   const int timeout_ms)
int BHReadRequest(void **proc_id,
                  int *proc_id_len,
                  void **request,
                  int *request_len,
                  void **src,
                  const int timeout_ms)
{
    void *src_info = 0;
    std::string proc;
@@ -228,9 +234,9 @@
    return false;
}
bool BHSendReply(void *src,
                 const void *reply,
                 const int reply_len)
int BHSendReply(void *src,
                const void *reply,
                const int reply_len)
{
    MsgRequestTopicReply rep;
    if (!rep.ParseFromArray(reply, reply_len)) {
@@ -263,7 +269,7 @@
                r = reply.ParseFromArray(p, len);
                return r;
            };
            server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), (BHServerCallbackTag *) (&sender));
            server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), &sender);
            return r;
        };
    }
@@ -284,9 +290,9 @@
    ProcNode().Start(on_req, on_sub, on_reply);
}
bool BHServerCallbackReply(const BHServerCallbackTag *tag,
                           const void *data,
                           const int data_len)
int BHServerCallbackReply(const void *tag,
                          const void *data,
                          const int data_len)
{
    auto &sender = *(const ServerSender *) (tag);
    return sender(data, data_len);
src/bh_api.h
@@ -5,26 +5,36 @@
extern "C" {
#endif
struct BHSrcInfo;
struct BHServerCallbackTag;
typedef int (*FBHApiIn1Out1)(const void *proc_info,
                             const int proc_info_len,
                             void **reply,
                             int *reply_len,
                             const int timeout_ms);
bool BHRegister(const void *proc_info,
                const int proc_info_len,
                void **reply,
                int *reply_len,
                const int timeout_ms);
bool BHRegisterTopics(const void *topics,
                      const int topics_len,
int BHApiIn1Out1Proxy(FBHApiIn1Out1 func,
                      const void *request,
                      const int request_len,
                      void **reply,
                      int *reply_len,
                      const int timeout_ms);
bool BHSubscribeTopics(const void *topics,
                       const int topics_len,
                       void **reply,
                       int *reply_len,
                       const int timeout_ms);
int BHRegister(const void *proc_info,
               const int proc_info_len,
               void **reply,
               int *reply_len,
               const int timeout_ms);
int BHRegisterTopics(const void *topics,
                     const int topics_len,
                     void **reply,
                     int *reply_len,
                     const int timeout_ms);
int BHSubscribeTopics(const void *topics,
                      const int topics_len,
                      void **reply,
                      int *reply_len,
                      const int timeout_ms);
typedef void (*FSubDataCallback)(const void *proc_id,
                                 const int proc_id_len,
@@ -35,7 +45,7 @@
                                const int proc_id_len,
                                const void *data,
                                const int data_len,
                                BHServerCallbackTag *tag);
                                const void *tag);
typedef void (*FClientCallback)(const void *proc_id,
                                const int proc_id_len,
@@ -45,56 +55,57 @@
                                const int data_len);
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb);
bool BHServerCallbackReply(const BHServerCallbackTag *tag,
                           const void *data,
                           const int data_len);
bool BHHeartBeatEasy(const int timeout_ms);
bool BHHeartBeat(const void *proc_info,
                 const int proc_info_len,
                 void **reply,
                 int *reply_len,
                 const int timeout_ms);
int BHServerCallbackReply(const void *tag,
                          const void *data,
                          const int data_len);
bool BHPublish(const void *msgpub,
               const int msgpub_len,
               const int timeout_ms);
int BHHeartbeatEasy(const int timeout_ms);
int BHHeartbeat(const void *proc_info,
                const int proc_info_len,
                void **reply,
                int *reply_len,
                const int timeout_ms);
bool BHReadSub(const void *proc_id,
               const int proc_id_len,
               void **msgpub,
               int *msgpub_len,
               const int timeout_ms);
int BHPublish(const void *msgpub,
              const int msgpub_len,
              const int timeout_ms);
bool BHAsyncRequest(const void *request,
                    const int request_len,
                    void **msg_id,
                    int *msg_id_len);
int BHReadSub(void **proc_id,
              int *proc_id_len,
              void **msgpub,
              int *msgpub_len,
              const int timeout_ms);
bool BHRequest(const void *request,
               const int request_len,
               void **proc_id,
               int *proc_id_len,
               void **reply,
               int *reply_len,
               const int timeout_ms);
int BHAsyncRequest(const void *request,
                   const int request_len,
                   void **msg_id,
                   int *msg_id_len);
bool BHReadRequest(void **proc_id,
                   int *proc_id_len,
                   void **request,
                   int *request_len,
                   BHSrcInfo **src,
                   const int timeout_ms);
int BHRequest(const void *request,
              const int request_len,
              void **proc_id,
              int *proc_id_len,
              void **reply,
              int *reply_len,
              const int timeout_ms);
bool BHSendReply(BHSrcInfo *src,
                 const void *reply,
                 const int reply_len);
int BHReadRequest(void **proc_id,
                  int *proc_id_len,
                  void **request,
                  int *request_len,
                  void **src,
                  const int timeout_ms);
int BHSendReply(void *src,
                const void *reply,
                const int reply_len);
// int BHCleanUp();
void BHFree(void *buf, int size);
int BHGetLastError(void **msg, int &msg_len);
int BHGetLastError(void **msg, int *msg_len);
#ifdef __cplusplus
}
src/defs.cpp
@@ -16,6 +16,8 @@
 * =====================================================================================
 */
#include "defs.h"
#include "shm.h"
namespace
{
@@ -40,6 +42,12 @@
const MQId &BHTopicCenterAddress() { return kBHTopicCenter; }
const MQId &BHUniCenterAddress() { return kBHUniCenter; }
bhome_shm::SharedMemory &BHomeShm()
{
    static bhome_shm::SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512);
    return shm;
}
void SetLastError(const int ec, const std::string &msg)
{
    LastErrorStore().ec_ = ec;
src/msg.h
@@ -29,7 +29,6 @@
namespace bhome_msg
{
using namespace bhome_shm;
using namespace bhome::msg; // for serialized data in MsgI
// MsgI is safe to be stored in shared memory, so POD data or offset_ptr is required.
// message format: header(meta) + body(data).
src/proto.h
@@ -22,16 +22,16 @@
#include "bhome_msg_api.pb.h"
#include <chrono>
using namespace bhome::msg;
using namespace bhome_msg;
template <class Msg>
struct MsgToType {
};
#define BHOME_MAP_MSG_AND_TYPE(mSG, tYPE)              \
    template <>                                        \
    struct MsgToType<mSG> {                            \
        static const bhome::msg::MsgType value = tYPE; \
#define BHOME_MAP_MSG_AND_TYPE(mSG, tYPE)  \
    template <>                            \
    struct MsgToType<mSG> {                \
        static const MsgType value = tYPE; \
    };
#define BHOME_SIMPLE_MAP_MSG(name) BHOME_MAP_MSG_AND_TYPE(Msg##name, kMsgType##name)
@@ -52,7 +52,7 @@
#undef BHOME_MAP_MSG_AND_TYPE
template <class Msg>
constexpr inline bhome::msg::MsgType GetType(const Msg &)
constexpr inline MsgType GetType(const Msg &)
{
    return MsgToType<Msg>::value;
}
src/socket.cpp
@@ -115,7 +115,7 @@
}
//maybe reimplment, using async cbs?
bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms)
bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms)
{
    // std::lock_guard<std::mutex> lock(mutex_); // seems no need to lock mutex_.
    bool got = (timeout_ms == 0) ? mq().TryRecv(msg) : mq().Recv(msg, timeout_ms);
src/socket.h
@@ -66,7 +66,7 @@
    size_t Pending() const { return mq().Pending(); }
    template <class Body>
    bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb = RecvCB())
    bool Send(const void *valid_remote, BHMsgHead &head, Body &body, const RecvCB &cb = RecvCB())
    {
        try {
            if (!cb) {
@@ -91,10 +91,10 @@
        return SendImpl(valid_remote, imsg);
    }
    bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms);
    bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
    template <class Body>
    bool SendAndRecv(const void *remote, const BHMsgHead &head, const Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
    bool SendAndRecv(const void *remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
    {
        struct State {
            std::mutex mutex;
src/topic_node.cpp
@@ -361,7 +361,7 @@
    return false;
}
bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms)
{
    if (!IsRegistered()) {
        SetLastError(eNotRegistered, "Not Registered.");
src/topic_node.h
@@ -69,10 +69,10 @@
    void Stop();
private:
    bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
    bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms);
    const std::string &proc_id() { return info_.proc_id(); }
    typedef bhome_msg::BHAddress Address;
    typedef BHAddress Address;
    class TopicQueryCache
    {
        class Impl
utest/api_test.cpp
@@ -19,7 +19,7 @@
#include "util.h"
#include <atomic>
using namespace bhome::msg;
using namespace bhome_msg;
namespace
{
@@ -66,7 +66,7 @@
                const int proc_id_len,
                const void *data,
                const int data_len,
                BHServerCallbackTag *tag)
                const void *tag)
{
    // printf("ServerProc: ");
    // DEFER1(printf("\n"););
@@ -138,19 +138,6 @@
    }
}
namespace
{
struct CCC {
};
void F(CCC &&c) {}
template <class... T>
void Pass(T &&...t)
{
    F(std::forward<decltype(t)>(t)...);
}
} // namespace
BOOST_AUTO_TEST_CASE(ApiTest)
{
    auto max_time = std::chrono::steady_clock::time_point::max();
@@ -266,7 +253,7 @@
    auto hb = [](std::atomic<bool> *run) {
        while (*run) {
            Sleep(1s, false);
            bool r = BHHeartBeatEasy(1000);
            bool r = BHHeartbeatEasy(1000);
            printf("heartbeat: %s\n", r ? "ok" : "failed");
        }
    };
utest/utest.cpp
@@ -198,7 +198,7 @@
{
    const std::string shm_name("ShmReqRep");
    ShmRemover auto_remove(shm_name);
    SharedMemory shm(shm_name, 1024 * 1024 * 50);
    SharedMemory shm(shm_name, 1024 * 1024 * 512);
    auto Avail = [&]() { return shm.get_free_memory(); };
    auto init_avail = Avail();
@@ -224,22 +224,24 @@
                printf("count: %d\n", count.load());
            }
        };
        MsgRequestTopic req;
        req.set_topic(topic);
        req.set_data("data " + std::string(100, 'a'));
        client.ClientStartWorker(onRecv, 2);
        boost::timer::auto_cpu_timer timer;
        for (int i = 0; i < nreq; ++i) {
            MsgRequestTopic req;
            req.set_topic(topic);
            req.set_data("data " + std::to_string(i));
            std::string msg_id;
            if (!client.ClientAsyncRequest(req, msg_id)) {
                printf("client request failed\n");
                ++count;
            }
            // if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) {
            // std::string proc_id;
            // MsgRequestTopicReply reply;
            // if (!client.ClientSyncRequest(req, proc_id, reply, 1000)) {
            //     printf("client request failed\n");
            // }
            //     ++count;
            // ++count;
        }
        do {
            std::this_thread::yield();
@@ -278,7 +280,7 @@
    servers.Launch(Server, "server", topics);
    Sleep(100ms);
    for (auto &t : topics) {
        clients.Launch(Client, t, 1000 * 100);
        clients.Launch(Client, t, 1000 * 100 * 2);
    }
    clients.WaitAll();
    printf("clients done, server replyed: %ld\n", server_msg_count.load());