zhangmeng
2021-02-04 379708307921a5bab2867c80326099383154bd9f
add recvandsend interface
1个文件已添加
4个文件已修改
248 ■■■■■ 已修改文件
bhomebus.go 102 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gopointer.go 62 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
libcbhomebus.c 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
libcbhomebus.h 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
libcbhomebus_func.h 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bhomebus.go
@@ -7,21 +7,21 @@
#include <stdio.h>
#include "libcbhomebus.h"
void* create_net_node_t(const int size){
static void* create_net_node_t(const int size){
    net_node_t* nodes = (net_node_t*)malloc(sizeof(net_node_t) * size);
    return nodes;
}
void release_net_node_t(void* arr){
static void release_net_node_t(void* arr){
    net_node_t* nodes = (net_node_t*)arr;
    free(nodes);
}
void set_1_net_node(void* arr, const int index, char* host, const int port, const int key){
static void set_1_net_node(void* arr, const int index, char* host, const int port, const int key){
    net_node_t* nodes = (net_node_t*)arr;
    nodes[index].host = strlen(host) > 0 ? host : NULL;
    nodes[index].port = port;
    nodes[index].key = key;
}
int get_1_net_mod_recv_msg(void* arr, const int index,
static int get_1_net_mod_recv_msg(void* arr, const int index,
    char** host, int* port, int* key, void** content, int* content_length){
    if (!arr) return -1;
@@ -34,26 +34,108 @@
    *content_length = msg[index].content_length;
    return 0;
}
void* create_int_array(const int size){
static void* create_int_array(const int size){
    int* arr = (int*)malloc(sizeof(int) * size);
    return arr;
}
void release_int_array(void* arr){
static void release_int_array(void* arr){
    int* carr = (int*)arr;
    free(carr);
}
int set_1_int(void* arr, const int index, const int key){
static int set_1_int(void* arr, const int index, const int key){
    if (!arr) return -1;
    int* carr = (int*)arr;
    carr[index] = key;
    return 0;
}
typedef void(*recvandsend_callback_fn)(void*, int, int, void**, int *, void *);
extern void cCallback(void *recvbuf, int recvsize, int recvkey,
                        void **sendbuf, int *sendsize, void *user_data);
*/
import "C"
import (
    "errors"
    "unsafe"
)
// add recvandsend funcs
//export cCallback
func cCallback(rbuf unsafe.Pointer, rsize C.int, rkey C.int,
    sbuf *unsafe.Pointer, ssize *C.int, userData unsafe.Pointer) {
    v := Restore(userData).(*goCallback)
    rdata := C.GoBytes(rbuf, rsize)
    var sdata []byte
    if v.cb(rdata, int(rkey), &sdata) {
        *sbuf = unsafe.Pointer(&sdata[0])
        *ssize = C.int(len(sdata))
    } else {
        *sbuf = nil
        *ssize = 0
    }
}
type goCallback struct {
    cb func(rdata []byte, rkey int, sdata *[]byte) bool
}
// Recvandsend socket
func (s *Socket) Recvandsend(fn func(rdata []byte, rkey int, sdata *[]byte) bool) int {
    if libbhomebus == nil || s.socket == nil {
        return -1
    }
    gcb := goCallback{
        cb: fn,
    }
    p := Save(&gcb)
    defer Unref(p)
    cbC := (C.recvandsend_callback_fn)(unsafe.Pointer(C.cCallback))
    ret := C.wrap_fn_socket_recvandsend(libbhomebus, s.socket, cbC, p)
    return int(ret)
}
// RecvandsendTimeout socket
func (s *Socket) RecvandsendTimeout(milliseconds int, fn func(rdata []byte, rkey int, sdata *[]byte) bool) int {
    if libbhomebus == nil || s.socket == nil {
        return -1
    }
    gcb := goCallback{
        cb: fn,
    }
    p := Save(&gcb)
    defer Unref(p)
    cbC := (C.recvandsend_callback_fn)(unsafe.Pointer(C.cCallback))
    ret := C.wrap_fn_socket_recvandsend_timeout(libbhomebus, s.socket, cbC, 0, C.int(milliseconds*1000000), p)
    return int(ret)
}
// RecvandsendNowait socket
func (s *Socket) RecvandsendNowait(fn func(rdata []byte, rkey int, sdata *[]byte) bool) int {
    if libbhomebus == nil || s.socket == nil {
        return -1
    }
    gcb := goCallback{
        cb: fn,
    }
    p := Save(&gcb)
    defer Unref(p)
    cbC := (C.recvandsend_callback_fn)(unsafe.Pointer(C.cCallback))
    ret := C.wrap_fn_socket_recvandsend_nowait(libbhomebus, s.socket, cbC, p)
    return int(ret)
}
var libbhomebus C.hbhomebus
@@ -611,7 +693,7 @@
    return int(C.wrap_fn_server_socket_start(libbhomebus, s.socket))
}
// ServerOpen bus server
// BusServerOpen bus server
func BusServerOpen() *Socket {
    if libbhomebus == nil {
        return nil
@@ -621,7 +703,7 @@
    return &Socket{sock}
}
// Close close
// BusClose close
func (s *Socket) BusClose() {
    if libbhomebus == nil {
        return
@@ -630,7 +712,7 @@
    C.wrap_fn_bus_server_socket_close(libbhomebus, s.socket)
}
// Start start
// BusStart start
func (s *Socket) BusStart() int {
    if libbhomebus == nil {
        return -1
gopointer.go
New file
@@ -0,0 +1,62 @@
package bhomebus
// #include <stdlib.h>
import "C"
import (
    "sync"
    "unsafe"
)
// from https://github.com/mattn/go-pointer
var (
    mutex sync.RWMutex
    store = map[unsafe.Pointer]interface{}{}
)
// Save to pointer
func Save(v interface{}) unsafe.Pointer {
    if v == nil {
        return nil
    }
    // Generate real fake C pointer.
    // This pointer will not store any data, but will bi used for indexing purposes.
    // Since Go doest allow to cast dangling pointer to unsafe.Pointer, we do rally allocate one byte.
    // Why we need indexing, because Go doest allow C code to store pointers to Go data.
    var ptr unsafe.Pointer = C.malloc(C.size_t(1))
    if ptr == nil {
        panic("can't allocate 'cgo-pointer hack index pointer': ptr == nil")
    }
    mutex.Lock()
    store[ptr] = v
    mutex.Unlock()
    return ptr
}
// Restore get pointer
func Restore(ptr unsafe.Pointer) (v interface{}) {
    if ptr == nil {
        return nil
    }
    mutex.RLock()
    v = store[ptr]
    mutex.RUnlock()
    return
}
// Unref release
func Unref(ptr unsafe.Pointer) {
    if ptr == nil {
        return
    }
    mutex.Lock()
    delete(store, ptr)
    mutex.Unlock()
    C.free(ptr)
}
libcbhomebus.c
@@ -197,6 +197,31 @@
    return fn_socket_sendandrecv_nowait(_socket, (net_node_t*)node_arr, arrlen, send_buf, send_size, (net_mod_recv_msg_t**)recv_arr, recv_arr_size);
}
// add recvandsend funcs
int wrap_fn_socket_recvandsend(hbhomebus lib, void *_sockt, recvandsend_callback_fn fn, void *user_data){
    if (!fn_socket_recvandsend){
        fn_socket_recvandsend = (tfn_net_mod_socket_recvandsend)dlsym(lib, l_net_mod_socket_recvandsend);
        check_with_ret(fn_socket_recvandsend, lib, -1);
    }
    return fn_socket_recvandsend(_sockt, fn, user_data);
}
int wrap_fn_socket_recvandsend_timeout(hbhomebus lib, void *_sockt, recvandsend_callback_fn fn, int sec, int nsec, void *user_data){
    if (!fn_socket_recvandsend_timeout){
        fn_socket_recvandsend_timeout = (tfn_net_mod_socket_recvandsend_timeout)dlsym(lib, l_net_mod_socket_recvandsend_timeout);
        check_with_ret(fn_socket_recvandsend_timeout, lib, -1);
    }
    return fn_socket_recvandsend_timeout(_sockt, fn, sec, nsec, user_data);
}
int wrap_fn_socket_recvandsend_nowait (hbhomebus lib, void *_sockt, recvandsend_callback_fn fn, void *user_data){
    if (!fn_socket_recvandsend_nowait){
        fn_socket_recvandsend_nowait = (tfn_net_mod_socket_recvandsend_nowait)dlsym(lib, l_net_mod_socket_recvandsend_nowait);
        check_with_ret(fn_socket_recvandsend_nowait, lib, -1);
    }
    return fn_socket_recvandsend_nowait(_sockt, fn, user_data);
}
//int  wrap_fn_socket_start_bus(hbhomebus lib, void * _socket)
//{
//    if(!fn_socket_start_bus){
libcbhomebus.h
@@ -28,6 +28,11 @@
static tfn_net_mod_socket_sendandrecv           fn_socket_sendandrecv = NULL;
static tfn_net_mod_socket_sendandrecv_timeout   fn_socket_sendandrecv_timeout = NULL;
static tfn_net_mod_socket_sendandrecv_nowait    fn_socket_sendandrecv_nowait = NULL;
// add recvandsend funcs
static tfn_net_mod_socket_recvandsend           fn_socket_recvandsend = NULL;
static tfn_net_mod_socket_recvandsend_timeout   fn_socket_recvandsend_timeout = NULL;
static tfn_net_mod_socket_recvandsend_nowait    fn_socket_recvandsend_nowait = NULL;
//static tfn_net_mod_socket_start_bus             fn_socket_start_bus = NULL;
static tfn_net_mod_socket_pub                   fn_socket_pub = NULL;
static tfn_net_mod_socket_pub_timeout           fn_socket_pub_timeout = NULL;
@@ -75,6 +80,11 @@
const static char l_net_mod_socket_sendandrecv[] = "net_mod_socket_sendandrecv";
const static char l_net_mod_socket_sendandrecv_timeout[] = "net_mod_socket_sendandrecv_timeout";
const static char l_net_mod_socket_sendandrecv_nowait[] = "net_mod_socket_sendandrecv_nowait";
// add recvandsend funcs
const static char l_net_mod_socket_recvandsend[] = "net_mod_socket_recvandsend";
const static char l_net_mod_socket_recvandsend_timeout[] = "net_mod_socket_recvandsend_timeout";
const static char l_net_mod_socket_recvandsend_nowait[] = "net_mod_socket_recvandsend_nowait";
//const static char l_net_mod_socket_start_bus[] = "net_mod_socket_start_bus";
const static char l_net_mod_socket_pub[] = "net_mod_socket_pub";
const static char l_net_mod_socket_pub_timeout[] = "net_mod_socket_pub_timeout";
@@ -129,6 +139,11 @@
    void ** recv_arr, int *recv_arr_size, int timeout);
int wrap_fn_socket_sendandrecv_nowait(hbhomebus lib, void *_sockt, void *node_arr, int arrlen, void *send_buf, int send_size,
    void ** recv_arr, int *recv_arr_size) ;
// add recvandsend funcs
typedef void(*recv_callback_fn)(void**, int*);
int wrap_fn_socket_recvandsend(hbhomebus lib, void *_sockt, recvandsend_callback_fn fn, void *user_data);
int wrap_fn_socket_recvandsend_timeout(hbhomebus lib, void *_sockt, recvandsend_callback_fn fn, int sec, int nsec, void *user_data);
int wrap_fn_socket_recvandsend_nowait (hbhomebus lib, void *_sockt, recvandsend_callback_fn fn, void *user_data);
//int  wrap_fn_socket_start_bus(hbhomebus lib, void * _socket);
int wrap_fn_socket_pub(hbhomebus lib, void *_sockt, void *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size);
int wrap_fn_socket_pub_timeout(hbhomebus lib, void *_sockt, void *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size, int timeout);
libcbhomebus_func.h
@@ -121,6 +121,50 @@
typedef int (*tfn_net_mod_socket_sendandrecv_nowait)(void *_sockt, net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) ;
/**
 * @brief 接受消息,并把callback函数返回的数据发送回对方,一直等待完成
 *
 * @param recvbuf 接受到的消息存放的缓存地址,该buf使用完成后需要手动释放
 * @param recvsize 接受到消息的长度
 * @param key 从谁哪里收到的信息
 * @callback  void (*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void * user_data)
 *            sendbuf 和 sendsize是callbak_fn回调函数的返回值, 分别表示返回的数据,和返回数据的长度。
 *
 * @return 0是成功, 其他值是失败的错误码
 */
typedef  void(*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void * user_data);
typedef int (*tfn_net_mod_socket_recvandsend)(void *_socket, recvandsend_callback_fn callback, void *user_data);
/**
 * @brief 接受消息,并把callback函数返回的数据发送回对方,在指定的时间内即使没有完成也返回
 *
 * @param recvbuf 接受到的消息存放的缓存地址,该buf使用完成后需要手动释放
 * @param recvsize 接受到消息的长度
 * @param key 从谁哪里收到的信息
 * @callback  void (*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void * user_data)
 *            sendbuf 和 sendsize是callbak_fn回调函数的返回值, 分别表示返回的数据,和返回数据的长度。
 *
 * @param sec 秒
 * @param nsec 纳秒
 *
 * @return 0是成功, 其他值是失败的错误码
 */
typedef int (*tfn_net_mod_socket_recvandsend_timeout)(void *_socket, recvandsend_callback_fn callback,
                                      int sec, int nsec, void *user_data) ;
/**
 * @brief 接受消息,并把callback函数返回的数据发送回对方,无论成功与否立刻返回
 *
 * @param recvbuf 接受到的消息存放的缓存地址,该buf使用完成后需要手动释放
 * @param recvsize 接受到消息的长度
 * @param key 从谁哪里收到的信息
 * @callback  void (*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void * user_data)
 *            sendbuf 和 sendsize是callbak_fn回调函数的返回值, 分别表示返回的数据,和返回数据的长度。
 *
 * @return 0是成功, 其他值是失败的错误码
 */
typedef int (*tfn_net_mod_socket_recvandsend_nowait)(void *_socket, recvandsend_callback_fn callback, void *user_data) ;
/**
 * 启动bus