d1f89c1eb52a08b97d5be5dd991c5991b6f0bf93..ad4f3dcedab29a690c5eedbb08ba1b393917db0b
2021-04-21 lichao
update go api.
ad4f3d 对比 | 目录
2021-04-21 lichao
Merge branch 'master' of http://192.168.5.5:10010/r/valib/bhshmq
0261ff 对比 | 目录
2021-04-21 lichao
change node socket to vector; try lock free queue.
3931f8 对比 | 目录
1个文件已删除
3个文件已添加
8个文件已修改
245 ■■■■ 已修改文件
api/bhsgo/bh_api.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/bhsgo/bh_api_go.h 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/bhsgo/bhome_node.go 42 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/bhsgo/bhome_node_test.go 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg_api.proto 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/lock_free_queue.cpp 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/lock_free_queue.h 57 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/bhsgo/bh_api.h
File was deleted
api/bhsgo/bh_api_go.h
@@ -18,21 +18,21 @@
#ifndef BH_API_GO_NVOG9GI5
#define BH_API_GO_NVOG9GI5
#include "bh_api.h"
#include "../../src/bh_api.h"
typedef const void *PCVoid;
extern void CGoSubDataCallback(PCVoid proc_id,
extern void cgoSubDataCallback(PCVoid proc_id,
                               int proc_id_len,
                               PCVoid data,
                               int data_len);
extern void CGoServerCallback(PCVoid proc_id,
extern void cgoServerCallback(PCVoid proc_id,
                              int proc_id_len,
                              PCVoid data,
                              int data_len,
                              void *src);
extern void CGoClientCallback(PCVoid proc_id,
extern void cgoClientCallback(PCVoid proc_id,
                              int proc_id_len,
                              PCVoid msg_id,
                              int msg_id_len,
@@ -41,7 +41,7 @@
static void CGoStartWorker()
{
    BHStartWorker(&CGoServerCallback, &CGoSubDataCallback, &CGoClientCallback);
    BHStartWorker(&cgoServerCallback, &cgoSubDataCallback, &cgoClientCallback);
}
#endif // end of include guard: BH_API_GO_NVOG9GI5
api/bhsgo/bhome_node.go
@@ -9,11 +9,10 @@
import (
    bh "bhshmq/proto/source/bhome_msg"
    "fmt"
    "unsafe"
)
func BHApiIn1Out1(bhfunc C.FBHApiIn1Out1, data []byte, reply *bh.MsgCommonReply, timeout_ms int) bool {
func bhApiIn1Out1(bhfunc C.FBHApiIn1Out1, data []byte, reply *bh.MsgCommonReply, timeout_ms int) bool {
    creply := unsafe.Pointer(nil)
    creply_len := C.int(0)
    defer C.BHFree(creply, creply_len)
@@ -27,22 +26,22 @@
func Register(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool {
    data, _ := proc.Marshal()
    return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHRegister), data, reply, timeout_ms)
    return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHRegister), data, reply, timeout_ms)
}
func RegisterTopics(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool {
    data, _ := topics.Marshal()
    return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHRegisterTopics), data, reply, timeout_ms)
    return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHRegisterTopics), data, reply, timeout_ms)
}
func Subscribe(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool {
    data, _ := topics.Marshal()
    return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeTopics), data, reply, timeout_ms)
    return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeTopics), data, reply, timeout_ms)
}
func Heartbeat(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool {
func Heartbeat(topics *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool {
    data, _ := topics.Marshal()
    return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHHeartbeat), data, reply, timeout_ms)
    return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHHeartbeat), data, reply, timeout_ms)
}
func HeartbeatEasy(timeout_ms int) bool {
@@ -133,24 +132,24 @@
var cgoClientCB ClientCB
var cgoSubDataCB SubDataCB
//export CGoSubDataCallback
func CGoSubDataCallback(cpid C.PCVoid, pid_len C.int, data C.PCVoid, data_len C.int) {
//export cgoSubDataCallback
func cgoSubDataCallback(cpid C.PCVoid, pid_len C.int, data C.PCVoid, data_len C.int) {
    proc_id := string(C.GoBytes(unsafe.Pointer(cpid), pid_len))
    msg := bh.MsgPublish{}
    msg.Unmarshal(C.GoBytes(unsafe.Pointer(data), data_len))
    cgoSubDataCB(&proc_id, &msg)
}
//export CGoServerCallback
func CGoServerCallback(cpid C.PCVoid, pid_len C.int, data C.PCVoid, data_len C.int, src unsafe.Pointer) {
//export cgoServerCallback
func cgoServerCallback(cpid C.PCVoid, pid_len C.int, data C.PCVoid, data_len C.int, src unsafe.Pointer) {
    proc_id := string(C.GoBytes(unsafe.Pointer(cpid), pid_len))
    msg := bh.MsgRequestTopic{}
    msg.Unmarshal(C.GoBytes(unsafe.Pointer(data), data_len))
    cgoServerCB(src, &proc_id, &msg)
}
//export CGoClientCallback
func CGoClientCallback(cpid C.PCVoid, pid_len C.int, msgid C.PCVoid, msgid_len C.int, data C.PCVoid, data_len C.int) {
//export cgoClientCallback
func cgoClientCallback(cpid C.PCVoid, pid_len C.int, msgid C.PCVoid, msgid_len C.int, data C.PCVoid, data_len C.int) {
    proc_id := string(C.GoBytes(unsafe.Pointer(cpid), pid_len))
    msg_id := C.GoBytes(unsafe.Pointer(msgid), msgid_len)
    var msg bh.MsgRequestTopicReply
@@ -164,20 +163,3 @@
    cgoSubDataCB = sub
    C.CGoStartWorker()
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// user code:
func ServerCallback(src unsafe.Pointer, proc_id *string, req *bh.MsgRequestTopic) {
    fmt.Println("user server cb called, request topic: " + string(req.Topic) + ", data:" + string(req.Data))
    reply := bh.MsgRequestTopicReply{}
    reply.Data = []byte("reply 1234")
    SendReply(src, &reply)
}
func SubDataCallback(proc_id *string, pub *bh.MsgPublish) {
    fmt.Println("user sub data cb called")
}
func ClientCallback(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply) {
    fmt.Println("user client cb reply: " + string(reply.Data))
}
api/bhsgo/bhome_node_test.go
@@ -5,8 +5,22 @@
    "fmt"
    "testing"
    "time"
    "unsafe"
)
func ServerCallback(src unsafe.Pointer, proc_id *string, req *bh.MsgRequestTopic) {
    fmt.Println("user server cb called, request topic: " + string(req.Topic) + ", data:" + string(req.Data))
    reply := bh.MsgRequestTopicReply{}
    reply.Data = []byte("reply 1234")
    SendReply(src, &reply)
}
func SubDataCallback(proc_id *string, pub *bh.MsgPublish) {
    fmt.Println("user sub data cb called")
}
func ClientCallback(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply) {
    fmt.Println("user client cb reply: " + string(reply.Data))
}
func TestRegister(t *testing.T) {
    proc_id := "test_proc"
    proc := bh.ProcInfo{}
@@ -20,6 +34,7 @@
        fmt.Println("register ok")
    } else {
        fmt.Println("register failed")
        t.Log("register error")
        return
    }
go.mod
@@ -1,3 +1,5 @@
module bhshmq
go 1.14
require github.com/gogo/protobuf v1.3.2 // indirect
go.sum
New file
@@ -0,0 +1,31 @@
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
proto/source/bhome_msg_api.proto
@@ -9,8 +9,8 @@
message BHAddress {
    bytes mq_id = 1; // mqid, uuid
    // bytes ip = 2;   //
    // int32 port = 3;
    bytes ip = 2;   //
    int32 port = 3;
}
message ProcInfo
src/topic_node.cpp
@@ -37,14 +37,16 @@
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
    shm_(shm), sock_node_(shm), sock_client_(shm, kMqLen), sock_server_(shm, kMqLen), sock_sub_(shm, kMqLen), state_(eStateUnregistered)
    shm_(shm), sockets_(eSockEnd), state_(eStateUnregistered)
{
    for (int i = eSockStart; i < eSockEnd; ++i) {
        sockets_[i].reset(new ShmSocket(shm_, kMqLen));
    }
    // recv msgs to avoid memory leak.
    auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
    SockNode().Start(default_ignore_msg);
    SockClient().Start(default_ignore_msg);
    SockServer().Start(default_ignore_msg);
    SockSub().Start(default_ignore_msg);
    for (auto &p : sockets_) {
        p->Start(default_ignore_msg);
    }
}
TopicNode::~TopicNode()
@@ -52,10 +54,7 @@
    Stop();
    SockNode().Stop();
    if (state() == eStateUnregistered) {
        SockNode().Remove();
        SockClient().Remove();
        SockServer().Remove();
        SockSub().Remove();
        for (auto &p : sockets_) { p->Remove(); }
    }
}
@@ -66,16 +65,14 @@
    } else if (nworker > 16) {
        nworker = 16;
    }
    SockNode().Start();
    ServerStart(server_cb, nworker);
    SubscribeStartWorker(sub_cb, nworker);
    ClientStartWorker(client_cb, nworker);
}
void TopicNode::Stop()
{
    SockSub().Stop();
    SockServer().Stop();
    SockClient().Stop();
    for (auto &p : sockets_) { p->Stop(); }
}
bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
src/topic_node.h
@@ -22,6 +22,7 @@
#include "socket.h"
#include <atomic>
#include <memory>
#include <vector>
using namespace bhome_shm;
using namespace bhome_msg;
@@ -107,17 +108,22 @@
    };
    // some sockets may be the same one, using functions make it easy to change.
    enum { eSockStart,
           eSockNode = eSockStart,
           eSockPub = eSockNode,
           eSockServer,
           eSockClient,
           eSockSub,
           eSockEnd,
    };
    std::vector<std::unique_ptr<ShmSocket>> sockets_;
    ShmSocket &SockNode() { return sock_node_; }
    ShmSocket &SockPub() { return SockNode(); }
    ShmSocket &SockSub() { return sock_sub_; }
    ShmSocket &SockClient() { return sock_client_; }
    ShmSocket &SockServer() { return sock_server_; }
    ShmSocket &SockNode() { return *sockets_[eSockNode]; }
    ShmSocket &SockPub() { return *sockets_[eSockPub]; }
    ShmSocket &SockSub() { return *sockets_[eSockSub]; }
    ShmSocket &SockClient() { return *sockets_[eSockClient]; }
    ShmSocket &SockServer() { return *sockets_[eSockServer]; }
    ShmSocket sock_node_;
    ShmSocket sock_client_;
    ShmSocket sock_server_;
    ShmSocket sock_sub_;
    enum State {
        eStateUnregistered,
        eStateOnline,
utest/api_test.cpp
@@ -155,7 +155,7 @@
    printf("maxsec: %ld\n", CountSeconds(max_time));
    bool reg = false;
    for (int i = 0; i < 10 && !reg; ++i) {
    for (int i = 0; i < 3 && !reg; ++i) {
        ProcInfo proc;
        proc.set_proc_id("demo_client");
        proc.set_public_info("public info of demo_client. etc...");
@@ -167,6 +167,9 @@
        BHFree(reply, reply_len);
        Sleep(1s);
    }
    if (!reg) {
        return;
    }
    const std::string topic_ = "topic_";
@@ -204,7 +207,7 @@
        for (int i = 0; i < 1; ++i) {
            MsgPublish pub;
            pub.set_topic(topic_ + std::to_string(i));
            pub.set_data("pub_data_" + std::string(1024 * 1024, 'a'));
            pub.set_data("pub_data_" + std::string(1024 * 1, 'a'));
            std::string s(pub.SerializeAsString());
            BHPublish(s.data(), s.size(), 0);
            // Sleep(1s);
utest/lock_free_queue.cpp
New file
@@ -0,0 +1,29 @@
/*
 * =====================================================================================
 *
 *       Filename:  lock_free_queue.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月21日 13时57分02秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#include "lock_free_queue.h"
#include "defs.h"
#include "util.h"
BOOST_AUTO_TEST_CASE(LockFreeTest)
{
    LockFreeQueue q(BHomeShm());
    for (int i = 0; i < 15; ++i) {
        int r = q.Write(i);
        printf("write %d %s\n", i, (r ? "ok" : "failed"));
    }
}
utest/lock_free_queue.h
New file
@@ -0,0 +1,57 @@
/*
 * =====================================================================================
 *
 *       Filename:  lock_free_queue.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月21日 14时03分27秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef LOCK_FREE_QUEUE_KQWP70HT
#define LOCK_FREE_QUEUE_KQWP70HT
#include "shm.h"
#include <boost/interprocess/offset_ptr.hpp>
#include <boost/lockfree/queue.hpp>
using namespace bhome_shm;
typedef int64_t Data;
const int kQLen = 10;
class LockFreeQueue : private boost::lockfree::queue<Data,
                                                     boost::lockfree::allocator<Allocator<Data>>,
                                                     boost::lockfree::capacity<kQLen>>,
                      private boost::noncopyable
{
    typedef boost::lockfree::queue<Data,
                                   boost::lockfree::allocator<Allocator<Data>>,
                                   boost::lockfree::capacity<kQLen>>
        Queue;
public:
    LockFreeQueue(SharedMemory &shm) :
        Queue(shm.get_segment_manager()) {}
    bool Read(Data &d) { return pop(d); }
    bool Write(Data const &d) { return push(d); }
    template <class Func>
    bool Write(Data const &d, Func onWrite)
    {
        if (Write(d)) {
            onWrite(d);
            return true;
        } else {
            return false;
        }
    }
};
#endif // end of include guard: LOCK_FREE_QUEUE_KQWP70HT