| | |
| | | #ifndef BH_API_GO_NVOG9GI5 |
| | | #define BH_API_GO_NVOG9GI5 |
| | | |
| | | #include "bh_api.h" |
| | | #include "../../src/bh_api.h" |
| | | |
| | | typedef const void *PCVoid; |
| | | extern void CGoSubDataCallback(PCVoid proc_id, |
| | | extern void cgoSubDataCallback(PCVoid proc_id, |
| | | int proc_id_len, |
| | | PCVoid data, |
| | | int data_len); |
| | | |
| | | extern void CGoServerCallback(PCVoid proc_id, |
| | | extern void cgoServerCallback(PCVoid proc_id, |
| | | int proc_id_len, |
| | | PCVoid data, |
| | | int data_len, |
| | | void *src); |
| | | |
| | | extern void CGoClientCallback(PCVoid proc_id, |
| | | extern void cgoClientCallback(PCVoid proc_id, |
| | | int proc_id_len, |
| | | PCVoid msg_id, |
| | | int msg_id_len, |
| | |
| | | |
| | | static void CGoStartWorker() |
| | | { |
| | | BHStartWorker(&CGoServerCallback, &CGoSubDataCallback, &CGoClientCallback); |
| | | BHStartWorker(&cgoServerCallback, &cgoSubDataCallback, &cgoClientCallback); |
| | | } |
| | | |
| | | #endif // end of include guard: BH_API_GO_NVOG9GI5 |
| | |
| | | |
| | | import ( |
| | | bh "bhshmq/proto/source/bhome_msg" |
| | | "fmt" |
| | | "unsafe" |
| | | ) |
| | | |
| | | func BHApiIn1Out1(bhfunc C.FBHApiIn1Out1, data []byte, reply *bh.MsgCommonReply, timeout_ms int) bool { |
| | | func bhApiIn1Out1(bhfunc C.FBHApiIn1Out1, data []byte, reply *bh.MsgCommonReply, timeout_ms int) bool { |
| | | creply := unsafe.Pointer(nil) |
| | | creply_len := C.int(0) |
| | | defer C.BHFree(creply, creply_len) |
| | |
| | | |
| | | func Register(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool { |
| | | data, _ := proc.Marshal() |
| | | return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHRegister), data, reply, timeout_ms) |
| | | return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHRegister), data, reply, timeout_ms) |
| | | } |
| | | |
| | | func RegisterTopics(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool { |
| | | data, _ := topics.Marshal() |
| | | return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHRegisterTopics), data, reply, timeout_ms) |
| | | return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHRegisterTopics), data, reply, timeout_ms) |
| | | } |
| | | |
| | | func Subscribe(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool { |
| | | data, _ := topics.Marshal() |
| | | return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeTopics), data, reply, timeout_ms) |
| | | return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeTopics), data, reply, timeout_ms) |
| | | } |
| | | |
| | | func Heartbeat(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool { |
| | | func Heartbeat(topics *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool { |
| | | data, _ := topics.Marshal() |
| | | return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHHeartbeat), data, reply, timeout_ms) |
| | | return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHHeartbeat), data, reply, timeout_ms) |
| | | } |
| | | |
| | | func HeartbeatEasy(timeout_ms int) bool { |
| | |
| | | var cgoClientCB ClientCB |
| | | var cgoSubDataCB SubDataCB |
| | | |
| | | //export CGoSubDataCallback |
| | | func CGoSubDataCallback(cpid C.PCVoid, pid_len C.int, data C.PCVoid, data_len C.int) { |
| | | //export cgoSubDataCallback |
| | | func cgoSubDataCallback(cpid C.PCVoid, pid_len C.int, data C.PCVoid, data_len C.int) { |
| | | proc_id := string(C.GoBytes(unsafe.Pointer(cpid), pid_len)) |
| | | msg := bh.MsgPublish{} |
| | | msg.Unmarshal(C.GoBytes(unsafe.Pointer(data), data_len)) |
| | | cgoSubDataCB(&proc_id, &msg) |
| | | } |
| | | |
| | | //export CGoServerCallback |
| | | func CGoServerCallback(cpid C.PCVoid, pid_len C.int, data C.PCVoid, data_len C.int, src unsafe.Pointer) { |
| | | //export cgoServerCallback |
| | | func cgoServerCallback(cpid C.PCVoid, pid_len C.int, data C.PCVoid, data_len C.int, src unsafe.Pointer) { |
| | | proc_id := string(C.GoBytes(unsafe.Pointer(cpid), pid_len)) |
| | | msg := bh.MsgRequestTopic{} |
| | | msg.Unmarshal(C.GoBytes(unsafe.Pointer(data), data_len)) |
| | | cgoServerCB(src, &proc_id, &msg) |
| | | } |
| | | |
| | | //export CGoClientCallback |
| | | func CGoClientCallback(cpid C.PCVoid, pid_len C.int, msgid C.PCVoid, msgid_len C.int, data C.PCVoid, data_len C.int) { |
| | | //export cgoClientCallback |
| | | func cgoClientCallback(cpid C.PCVoid, pid_len C.int, msgid C.PCVoid, msgid_len C.int, data C.PCVoid, data_len C.int) { |
| | | proc_id := string(C.GoBytes(unsafe.Pointer(cpid), pid_len)) |
| | | msg_id := C.GoBytes(unsafe.Pointer(msgid), msgid_len) |
| | | var msg bh.MsgRequestTopicReply |
| | |
| | | cgoSubDataCB = sub |
| | | C.CGoStartWorker() |
| | | } |
| | | |
| | | ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
| | | // user code: |
| | | |
| | | func ServerCallback(src unsafe.Pointer, proc_id *string, req *bh.MsgRequestTopic) { |
| | | fmt.Println("user server cb called, request topic: " + string(req.Topic) + ", data:" + string(req.Data)) |
| | | reply := bh.MsgRequestTopicReply{} |
| | | reply.Data = []byte("reply 1234") |
| | | SendReply(src, &reply) |
| | | } |
| | | |
| | | func SubDataCallback(proc_id *string, pub *bh.MsgPublish) { |
| | | fmt.Println("user sub data cb called") |
| | | } |
| | | func ClientCallback(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply) { |
| | | fmt.Println("user client cb reply: " + string(reply.Data)) |
| | | } |
| | |
| | | "fmt" |
| | | "testing" |
| | | "time" |
| | | "unsafe" |
| | | ) |
| | | |
| | | func ServerCallback(src unsafe.Pointer, proc_id *string, req *bh.MsgRequestTopic) { |
| | | fmt.Println("user server cb called, request topic: " + string(req.Topic) + ", data:" + string(req.Data)) |
| | | reply := bh.MsgRequestTopicReply{} |
| | | reply.Data = []byte("reply 1234") |
| | | SendReply(src, &reply) |
| | | } |
| | | |
| | | func SubDataCallback(proc_id *string, pub *bh.MsgPublish) { |
| | | fmt.Println("user sub data cb called") |
| | | } |
| | | func ClientCallback(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply) { |
| | | fmt.Println("user client cb reply: " + string(reply.Data)) |
| | | } |
| | | func TestRegister(t *testing.T) { |
| | | proc_id := "test_proc" |
| | | proc := bh.ProcInfo{} |
| | |
| | | fmt.Println("register ok") |
| | | } else { |
| | | fmt.Println("register failed") |
| | | t.Log("register error") |
| | | return |
| | | } |
| | | |
| | |
| | | module bhshmq |
| | | |
| | | go 1.14 |
| | | |
| | | require github.com/gogo/protobuf v1.3.2 // indirect |
New file |
| | |
| | | github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= |
| | | github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= |
| | | github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= |
| | | github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= |
| | | github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= |
| | | github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= |
| | | golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= |
| | | golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= |
| | | golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= |
| | | golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= |
| | | golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= |
| | | golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= |
| | | golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= |
| | | golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= |
| | | golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= |
| | | golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= |
| | | golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= |
| | | golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= |
| | | golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= |
| | | golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
| | | golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
| | | golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= |
| | | golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= |
| | | golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= |
| | | golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= |
| | | golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= |
| | | golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= |
| | | golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= |
| | | golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= |
| | | golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= |
| | | golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= |
| | |
| | | |
| | | message BHAddress { |
| | | bytes mq_id = 1; // mqid, uuid |
| | | // bytes ip = 2; // |
| | | // int32 port = 3; |
| | | bytes ip = 2; // |
| | | int32 port = 3; |
| | | } |
| | | |
| | | message ProcInfo |
| | |
| | | } // namespace |
| | | |
| | | TopicNode::TopicNode(SharedMemory &shm) : |
| | | shm_(shm), sock_node_(shm), sock_client_(shm, kMqLen), sock_server_(shm, kMqLen), sock_sub_(shm, kMqLen), state_(eStateUnregistered) |
| | | shm_(shm), sockets_(eSockEnd), state_(eStateUnregistered) |
| | | { |
| | | for (int i = eSockStart; i < eSockEnd; ++i) { |
| | | sockets_[i].reset(new ShmSocket(shm_, kMqLen)); |
| | | } |
| | | // recv msgs to avoid memory leak. |
| | | auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; |
| | | SockNode().Start(default_ignore_msg); |
| | | SockClient().Start(default_ignore_msg); |
| | | SockServer().Start(default_ignore_msg); |
| | | SockSub().Start(default_ignore_msg); |
| | | for (auto &p : sockets_) { |
| | | p->Start(default_ignore_msg); |
| | | } |
| | | } |
| | | |
| | | TopicNode::~TopicNode() |
| | |
| | | Stop(); |
| | | SockNode().Stop(); |
| | | if (state() == eStateUnregistered) { |
| | | SockNode().Remove(); |
| | | SockClient().Remove(); |
| | | SockServer().Remove(); |
| | | SockSub().Remove(); |
| | | for (auto &p : sockets_) { p->Remove(); } |
| | | } |
| | | } |
| | | |
| | |
| | | } else if (nworker > 16) { |
| | | nworker = 16; |
| | | } |
| | | |
| | | SockNode().Start(); |
| | | ServerStart(server_cb, nworker); |
| | | SubscribeStartWorker(sub_cb, nworker); |
| | | ClientStartWorker(client_cb, nworker); |
| | | } |
| | | void TopicNode::Stop() |
| | | { |
| | | SockSub().Stop(); |
| | | SockServer().Stop(); |
| | | SockClient().Stop(); |
| | | for (auto &p : sockets_) { p->Stop(); } |
| | | } |
| | | |
| | | bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | |
| | | #include "socket.h" |
| | | #include <atomic> |
| | | #include <memory> |
| | | #include <vector> |
| | | |
| | | using namespace bhome_shm; |
| | | using namespace bhome_msg; |
| | |
| | | }; |
| | | |
| | | // some sockets may be the same one, using functions make it easy to change. |
| | | enum { eSockStart, |
| | | eSockNode = eSockStart, |
| | | eSockPub = eSockNode, |
| | | eSockServer, |
| | | eSockClient, |
| | | eSockSub, |
| | | eSockEnd, |
| | | }; |
| | | std::vector<std::unique_ptr<ShmSocket>> sockets_; |
| | | |
| | | ShmSocket &SockNode() { return sock_node_; } |
| | | ShmSocket &SockPub() { return SockNode(); } |
| | | ShmSocket &SockSub() { return sock_sub_; } |
| | | ShmSocket &SockClient() { return sock_client_; } |
| | | ShmSocket &SockServer() { return sock_server_; } |
| | | ShmSocket &SockNode() { return *sockets_[eSockNode]; } |
| | | ShmSocket &SockPub() { return *sockets_[eSockPub]; } |
| | | ShmSocket &SockSub() { return *sockets_[eSockSub]; } |
| | | ShmSocket &SockClient() { return *sockets_[eSockClient]; } |
| | | ShmSocket &SockServer() { return *sockets_[eSockServer]; } |
| | | |
| | | ShmSocket sock_node_; |
| | | ShmSocket sock_client_; |
| | | ShmSocket sock_server_; |
| | | ShmSocket sock_sub_; |
| | | enum State { |
| | | eStateUnregistered, |
| | | eStateOnline, |
| | |
| | | printf("maxsec: %ld\n", CountSeconds(max_time)); |
| | | |
| | | bool reg = false; |
| | | for (int i = 0; i < 10 && !reg; ++i) { |
| | | for (int i = 0; i < 3 && !reg; ++i) { |
| | | ProcInfo proc; |
| | | proc.set_proc_id("demo_client"); |
| | | proc.set_public_info("public info of demo_client. etc..."); |
| | |
| | | |
| | | BHFree(reply, reply_len); |
| | | Sleep(1s); |
| | | } |
| | | if (!reg) { |
| | | return; |
| | | } |
| | | |
| | | const std::string topic_ = "topic_"; |
| | |
| | | for (int i = 0; i < 1; ++i) { |
| | | MsgPublish pub; |
| | | pub.set_topic(topic_ + std::to_string(i)); |
| | | pub.set_data("pub_data_" + std::string(1024 * 1024, 'a')); |
| | | pub.set_data("pub_data_" + std::string(1024 * 1, 'a')); |
| | | std::string s(pub.SerializeAsString()); |
| | | BHPublish(s.data(), s.size(), 0); |
| | | // Sleep(1s); |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: lock_free_queue.cpp |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月21日 13时57分02秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #include "lock_free_queue.h" |
| | | #include "defs.h" |
| | | #include "util.h" |
| | | |
| | | BOOST_AUTO_TEST_CASE(LockFreeTest) |
| | | { |
| | | LockFreeQueue q(BHomeShm()); |
| | | for (int i = 0; i < 15; ++i) { |
| | | int r = q.Write(i); |
| | | printf("write %d %s\n", i, (r ? "ok" : "failed")); |
| | | } |
| | | } |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: lock_free_queue.h |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月21日 14时03分27秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | |
| | | #ifndef LOCK_FREE_QUEUE_KQWP70HT |
| | | #define LOCK_FREE_QUEUE_KQWP70HT |
| | | |
| | | #include "shm.h" |
| | | #include <boost/interprocess/offset_ptr.hpp> |
| | | #include <boost/lockfree/queue.hpp> |
| | | |
| | | using namespace bhome_shm; |
| | | |
| | | typedef int64_t Data; |
| | | const int kQLen = 10; |
| | | class LockFreeQueue : private boost::lockfree::queue<Data, |
| | | boost::lockfree::allocator<Allocator<Data>>, |
| | | boost::lockfree::capacity<kQLen>>, |
| | | private boost::noncopyable |
| | | { |
| | | typedef boost::lockfree::queue<Data, |
| | | boost::lockfree::allocator<Allocator<Data>>, |
| | | boost::lockfree::capacity<kQLen>> |
| | | Queue; |
| | | |
| | | public: |
| | | LockFreeQueue(SharedMemory &shm) : |
| | | Queue(shm.get_segment_manager()) {} |
| | | bool Read(Data &d) { return pop(d); } |
| | | bool Write(Data const &d) { return push(d); } |
| | | template <class Func> |
| | | bool Write(Data const &d, Func onWrite) |
| | | { |
| | | if (Write(d)) { |
| | | onWrite(d); |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | }; |
| | | |
| | | #endif // end of include guard: LOCK_FREE_QUEUE_KQWP70HT |