lichao
2021-04-20 64bff0caaf665c65125cdab2b144f3594d520002
go api works. refactor.
1个文件已添加
5个文件已修改
171 ■■■■ 已修改文件
api/go/bh_api_go.h 47 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/go/bhome_node.go 104 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/go/go.mod 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.h 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/go/bh_api_go.h
New file
@@ -0,0 +1,47 @@
/*
 * =====================================================================================
 *
 *       Filename:  bh_api_go.h
 *
 *    Description:  go api callback functions.
 *
 *        Version:  1.0
 *        Created:  2021年04月20日 16时18分27秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef BH_API_GO_NVOG9GI5
#define BH_API_GO_NVOG9GI5
#include "bh_api.h"
typedef const void *PCVoid;
extern void CGoSubDataCallback(PCVoid proc_id,
                               int proc_id_len,
                               PCVoid data,
                               int data_len);
extern void CGoServerCallback(PCVoid proc_id,
                              int proc_id_len,
                              PCVoid data,
                              int data_len,
                              void *src);
extern void CGoClientCallback(PCVoid proc_id,
                              int proc_id_len,
                              PCVoid msg_id,
                              int msg_id_len,
                              PCVoid data,
                              int data_len);
static void CGoStartWorker()
{
    BHStartWorker(&CGoServerCallback, &CGoSubDataCallback, &CGoClientCallback);
}
#endif // end of include guard: BH_API_GO_NVOG9GI5
api/go/bhome_node.go
@@ -1,37 +1,16 @@
package main
/*
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include "bh_api.h"
void print(int v)
{
    printf("print %d\n", v);
}
static void ReadData(void **p, int32_t *n)
{
    *n = 4;
    *p = malloc(4);
    memcpy(*p, "abc", 4);
    *n = 4;
}
static void PrintData(void *p, int32_t n)
{
    printf("data :%s\n", (char*)p);
    free(p);
}
#include "bh_api_go.h"
*/
// #cgo LDFLAGS: -L/home/lichao/code/shmsg/build/lib -L/usr/local/lib -lbhome_shmq -lbhome_msg -lprotobuf-lite -lstdc++ -lpthread -lrt
import "C"
import (
    bh "bhome/bhome_msg"
    bh "bhome_node/bhome_msg"
    "fmt"
    "time"
    "unsafe"
)
@@ -147,36 +126,70 @@
}
type ServecCB func(src unsafe.Pointer, proc_id *string, req *bh.MsgRequestTopic)
type SubDataCB func(proc_id *string, pub *bh.MsgPublish)
type ServerCB func(src unsafe.Pointer, proc_id *string, req *bh.MsgRequestTopic)
type ClientCB func(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply)
type SubDataCB func(proc_id *string, pub *bh.MsgPublish)
func cserver_callback(cpid unsafe.Pointer, pid_len C.int, src unsafe.Pointer) {
var cgoServerCB ServerCB
var cgoClientCB ClientCB
var cgoSubDataCB SubDataCB
//export CGoSubDataCallback
func CGoSubDataCallback(cpid C.PCVoid, pid_len C.int, data C.PCVoid, data_len C.int) {
    proc_id := string(C.GoBytes(unsafe.Pointer(cpid), pid_len))
    msg := bh.MsgPublish{}
    msg.Unmarshal(C.GoBytes(unsafe.Pointer(data), data_len))
    cgoSubDataCB(&proc_id, &msg)
}
func StartWorker(server_cb ServecCB, sub_cb SubDataCB, client_cb ClientCB) {
//export CGoServerCallback
func CGoServerCallback(cpid C.PCVoid, pid_len C.int, data C.PCVoid, data_len C.int, src unsafe.Pointer) {
    proc_id := string(C.GoBytes(unsafe.Pointer(cpid), pid_len))
    msg := bh.MsgRequestTopic{}
    msg.Unmarshal(C.GoBytes(unsafe.Pointer(data), data_len))
    cgoServerCB(src, &proc_id, &msg)
}
//export CGoClientCallback
func CGoClientCallback(cpid C.PCVoid, pid_len C.int, msgid C.PCVoid, msgid_len C.int, data C.PCVoid, data_len C.int) {
    proc_id := string(C.GoBytes(unsafe.Pointer(cpid), pid_len))
    msg_id := C.GoBytes(unsafe.Pointer(msgid), msgid_len)
    var msg bh.MsgRequestTopicReply
    msg.Unmarshal(C.GoBytes(unsafe.Pointer(data), data_len))
    cgoClientCB(&proc_id, &msg_id, &msg)
}
func StartWorker(c ClientCB, s ServerCB, sub SubDataCB) {
    cgoClientCB = c
    cgoServerCB = s
    cgoSubDataCB = sub
    C.CGoStartWorker()
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// user code:
func ServerCallback(proc_id *string, req *bh.MsgRequestTopic, reply *bh.MsgRequestTopicReply) bool {
    // xxxx
    return true
func ServerCallback(src unsafe.Pointer, proc_id *string, req *bh.MsgRequestTopic) {
    fmt.Println("user server cb called, request topic: " + string(req.Topic) + ", data:" + string(req.Data))
    reply := bh.MsgRequestTopicReply{}
    reply.Data = []byte("reply 1234")
    SendReply(src, &reply)
}
func SubDataCallback(proc_id *string, pub *bh.MsgPublish) {
    fmt.Println("user sub data cb called")
}
func ClientCallback(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply) {
    fmt.Println("user client cb reply: " + string(reply.Data))
}
func main() {
    proc_id := "test_proc"
    proc := bh.ProcInfo{}
    proc.ProcId = []byte("test_proc")
    proc.ProcId = []byte(proc_id)
    reply := bh.MsgCommonReply{}
    StartWorker(ClientCallback, ServerCallback, SubDataCallback)
    r := Register(&proc, &reply, 1000)
    if r {
@@ -201,9 +214,24 @@
    } else {
        fmt.Println("reg topics failed")
    }
    req := bh.MsgRequestTopic{}
    time.Sleep(time.Second * 1)
    req.Topic = []byte("topic0")
    req.Data = []byte("data0")
    // var msg_id []byte
    // AsyncRequest(&req, &msg_id)
    // fmt.Println(msg_id)
    // time.Sleep(time.Second * 5)
    p := unsafe.Pointer(nil)
    n := C.int32_t(0)
    C.ReadData(&p, &n)
    C.PrintData(p, n)
    pid := ""
    rr := bh.MsgRequestTopicReply{}
    for i := 0; i < 10000; i++ {
        if Request(&req, &pid, &rr, 3000) {
            fmt.Println("server:" + pid + ", reply:" + string(rr.Data))
        } else {
            e, s := GetLastError()
            fmt.Println("ec:", e, ", msg:"+s)
        }
    }
}
api/go/go.mod
@@ -1,4 +1,4 @@
module bhome
module bhome_node
go 1.16
src/bh_api.h
@@ -37,22 +37,22 @@
                      const int timeout_ms);
typedef void (*FSubDataCallback)(const void *proc_id,
                                 const int proc_id_len,
                                 int proc_id_len,
                                 const void *data,
                                 const int data_len);
                                 int data_len);
typedef void (*FServerCallback)(const void *proc_id,
                                const int proc_id_len,
                                int proc_id_len,
                                const void *data,
                                const int data_len,
                                int data_len,
                                void *src);
typedef void (*FClientCallback)(const void *proc_id,
                                const int proc_id_len,
                                int proc_id_len,
                                const void *msg_id,
                                const int msg_id_len,
                                int msg_id_len,
                                const void *data,
                                const int data_len);
                                int data_len);
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb);
src/socket.h
@@ -125,6 +125,7 @@
                return true;
            } else {
                st->canceled = true;
                SetLastError(ETIMEDOUT, "timeout");
                return false;
            }
        } catch (...) {
src/topic_node.cpp
@@ -40,6 +40,9 @@
    // recv msgs to avoid memory leak.
    auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
    SockNode().Start(default_ignore_msg);
    SockClient().Start(default_ignore_msg);
    SockServer().Start(default_ignore_msg);
    SockSub().Start(default_ignore_msg);
}
TopicNode::~TopicNode()