zhangmeng
2020-08-06 f87b7233a94131a5fd1b3dfb1595226e2779fffa
add remove key(s) and RequestWithTimeout
5个文件已修改
184 ■■■■■ 已修改文件
libcsoftbus.c 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
libcsoftbus.h 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
libcsoftbus_func.h 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
library.go 41 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
softbusDgram.go 68 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
libcsoftbus.c
@@ -405,6 +405,30 @@
    return fn_dgram_socket_sub_nowait(s, topic, size, port);
}
int wrap_fn_dgram_socket_desub(hcsoftbus lib, void *s, void *topic, int size, int port){
    if (!fn_dgram_socket_desub){
        fn_dgram_socket_desub = (tfn_dgram_socket_desub)dlsym(lib, l_dgram_socket_desub);
        check_with_ret(fn_dgram_socket_desub, lib, -1);
    }
    return fn_dgram_socket_desub(s, topic, size, port);
}
int wrap_fn_dgram_socket_desub_timeout(hcsoftbus lib, void *s, void *topic, int size, int port, int sec, int usec){
    if (!fn_dgram_socket_desub_timeout){
        fn_dgram_socket_desub_timeout = (tfn_dgram_socket_desub_timeout)dlsym(lib, l_dgram_socket_desub_timeout);
        check_with_ret(fn_dgram_socket_desub_timeout, lib, -1);
    }
    return fn_dgram_socket_desub_timeout(s, topic, size, port, sec, usec);
}
int wrap_fn_dgram_socket_desub_nowait(hcsoftbus lib, void *s, void *topic, int size, int port){
    if (!fn_dgram_socket_desub_nowait){
        fn_dgram_socket_desub_nowait = (tfn_dgram_socket_desub_nowait)dlsym(lib, l_dgram_socket_desub_nowait);
        check_with_ret(fn_dgram_socket_desub_nowait, lib, -1);
    }
    return fn_dgram_socket_desub_nowait(s, topic, size, port);
}
int wrap_fn_dgram_socket_pub(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port){
    if (!fn_dgram_socket_pub){
        fn_dgram_socket_pub = (tfn_dgram_socket_pub)dlsym(lib, l_dgram_socket_pub);
@@ -448,3 +472,18 @@
    }
}
int wrap_fn_dgram_remove_key(hcsoftbus lib, int key){
    if (!fn_dgram_remove_key){
        fn_dgram_remove_key = (tfn_dgram_remove_key)dlsym(lib, l_dgram_remove_key);
        check_with_ret(fn_dgram_remove_key, lib, -1);
    }
    return fn_dgram_remove_key(key);
}
int wrap_fn_dgram_remove_keys(hcsoftbus lib, void *keys, int length){
    if (!fn_dgram_remove_keys){
        fn_dgram_remove_keys = (tfn_dgram_remove_keys)dlsym(lib, l_dgram_remove_keys);
        check_with_ret(fn_dgram_remove_keys, lib, -1);
    }
    return fn_dgram_remove_keys((int*)keys, length);
}
libcsoftbus.h
@@ -64,12 +64,16 @@
static tfn_dgram_socket_sub     fn_dgram_socket_sub = NULL;
static tfn_dgram_socket_sub_timeout     fn_dgram_socket_sub_timeout = NULL;
static tfn_dgram_socket_sub_nowait     fn_dgram_socket_sub_nowait = NULL;
static tfn_dgram_socket_desub   fn_dgram_socket_desub = NULL;
static tfn_dgram_socket_desub_timeout   fn_dgram_socket_desub_timeout = NULL;
static tfn_dgram_socket_desub_nowait   fn_dgram_socket_desub_nowait = NULL;
static tfn_dgram_socket_pub     fn_dgram_socket_pub = NULL;
static tfn_dgram_socket_pub_timeout     fn_dgram_socket_pub_timeout = NULL;
static tfn_dgram_socket_pub_nowait     fn_dgram_socket_pub_nowait = NULL;
static tfn_dgram_socket_port    fn_dgram_socket_port = NULL;
static tfn_dgram_socket_free    fn_dgram_socket_free = NULL;
static tfn_dgram_remove_key     fn_dgram_remove_key = NULL;
static tfn_dgram_remove_keys     fn_dgram_remove_keys = NULL;
//////////////////////////////////////////////////////////////////////
// labels
@@ -129,12 +133,16 @@
const static char l_dgram_socket_sub[] = "dgram_mod_sub";
const static char l_dgram_socket_sub_timeout[] = "dgram_mod_sub_timeout";
const static char l_dgram_socket_sub_nowait[] = "dgram_mod_sub_nowait";
const static char l_dgram_socket_desub[] = "dgram_mod_desub";
const static char l_dgram_socket_desub_timeout[] = "dgram_mod_desub_timeout";
const static char l_dgram_socket_desub_nowait[] = "dgram_mod_desub_nowait";
const static char l_dgram_socket_pub[] = "dgram_mod_pub";
const static char l_dgram_socket_pub_timeout[] = "dgram_mod_pub_timeout";
const static char l_dgram_socket_pub_nowait[] = "dgram_mod_pub_nowait";
const static char l_dgram_socket_port[] = "dgram_mod_get_port";
const static char l_dgram_socket_free[] = "dgram_mod_free";
const static char l_dgram_remove_key[] = "dgram_mod_remove_key";
const static char l_dgram_remove_keys[] = "dgram_mod_remove_keys";
//////////////////////////////////////////////////////////////////////
// dlopen dynamic library
@@ -200,11 +208,16 @@
int wrap_fn_dgram_socket_sub(hcsoftbus lib, void *s, void *topic, int size, int port);
int wrap_fn_dgram_socket_sub_timeout(hcsoftbus lib, void *s, void *topic, int size, int port, int sec, int usec);
int wrap_fn_dgram_socket_sub_nowait(hcsoftbus lib, void *s, void *topic, int size, int port);
int wrap_fn_dgram_socket_desub(hcsoftbus lib, void *s, void *topic, int size, int port);
int wrap_fn_dgram_socket_desub_timeout(hcsoftbus lib, void *s, void *topic, int size, int port, int sec, int usec);
int wrap_fn_dgram_socket_desub_nowait(hcsoftbus lib, void *s, void *topic, int size, int port);
int wrap_fn_dgram_socket_pub(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port);
int wrap_fn_dgram_socket_pub_timeout(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port, int sec, int usec);
int wrap_fn_dgram_socket_pub_nowait(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port);
int wrap_fn_dgram_socket_port(hcsoftbus lib, void *s);
void wrap_fn_dgram_socket_free(hcsoftbus lib, void *buf);
int wrap_fn_dgram_remove_key(hcsoftbus lib, int key);
int wrap_fn_dgram_remove_keys(hcsoftbus lib, void *keys, int length);
#ifdef __cplusplus
}
libcsoftbus_func.h
@@ -211,6 +211,15 @@
typedef int(*tfn_dgram_socket_sub_timeout) (void*, void*, int, int, int, int);
typedef tfn_dgram_socket_sub tfn_dgram_socket_sub_nowait;
/**
 * 取消订阅指定主题
 * @topic 主题
 * @size 主题长度
 * @port 总线端口
 */
typedef tfn_dgram_socket_sub tfn_dgram_socket_desub;
typedef tfn_dgram_socket_sub_timeout tfn_dgram_socket_desub_timeout;
typedef tfn_dgram_socket_sub_nowait tfn_dgram_socket_desub_nowait;
/**
 * 发布主题
 * @topic 主题
 * @content 主题内容
@@ -228,6 +237,16 @@
 */
typedef void(*tfn_dgram_socket_free) (void*);
/**
 * 删除key对应的共享队列,并在bus里删除该key的订阅
 */
typedef int(*tfn_dgram_remove_key) (int);
/**
 * 批量删除key对应的共享队列,并在bus里删除该key的订阅
 */
typedef int(*tfn_dgram_remove_keys) (int*, int);
#ifdef __cplusplus
}
library.go
@@ -325,6 +325,47 @@
    return &ret
}
// RequestWithTimeout req sync
func (h *Handle) RequestWithTimeout(key int, info *MsgInfo, timeout int) *MsgInfo {
    h.mtxWorker.Lock()
    defer h.mtxWorker.Unlock()
    msg, err := proto.Marshal(info)
    if err != nil {
        return nil
    }
    until := (float32)(timeout)
    one := (float32)(timeoutSec) + ((float32)(timeoutUsec) / 1000000)
    fc := until / one
    count := (int)(fc)
    try := 0
    // 同步接口,需要等待返回值
    var ret MsgInfo
loop:
    for {
        select {
        case <-h.ctx.Done():
            return nil
        default:
            if data, err := h.sockWorker.sock.SendAndRecvTimeout(msg, key, timeoutSec, timeoutUsec); err == nil {
                if err := proto.Unmarshal(data, &ret); err == nil {
                    break loop
                } else {
                    try++
                    if try > count {
                        return nil
                    }
                }
            }
        }
    }
    return &ret
}
// Reply request
func (h *Handle) Reply(key int, info *MsgInfo) error {
    msg, err := proto.Marshal(info)
softbusDgram.go
@@ -3,6 +3,8 @@
/*
#include <stdlib.h>
#include "libcsoftbus.h"
extern void *get_array(const int size);
extern void set_array(void *arr, const int index, const int value);
*/
import "C"
import (
@@ -263,6 +265,45 @@
    return int(r)
}
// Desub remove sub
func (d *DgramSocket) Desub(topic string, port int) int {
    if libsoftbus == nil {
        return RETVAL
    }
    ct := C.CString(topic)
    defer C.free(unsafe.Pointer(ct))
    r := C.wrap_fn_dgram_socket_desub(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), C.int(port))
    return int(r)
}
// DesubTimeout timeout
func (d *DgramSocket) DesubTimeout(topic string, port int, sec, usec int) int {
    if libsoftbus == nil {
        return RETVAL
    }
    ct := C.CString(topic)
    defer C.free(unsafe.Pointer(ct))
    r := C.wrap_fn_dgram_socket_desub_timeout(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), C.int(port), C.int(sec), C.int(usec))
    return int(r)
}
// DesubNoWait remove sub
func (d *DgramSocket) DesubNoWait(topic string, port int) int {
    if libsoftbus == nil {
        return RETVAL
    }
    ct := C.CString(topic)
    defer C.free(unsafe.Pointer(ct))
    r := C.wrap_fn_dgram_socket_desub_nowait(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), C.int(port))
    return int(r)
}
// Pub bus
func (d *DgramSocket) Pub(topic, msg string, port int) int {
    if libsoftbus == nil {
@@ -317,3 +358,30 @@
    r := C.wrap_fn_dgram_socket_port(libsoftbus, d.dgram)
    return int(r)
}
// RemoveKey rm key
func (d *DgramSocket) RemoveKey(key int) int {
    if libsoftbus == nil {
        return RETVAL
    }
    r := C.wrap_fn_dgram_remove_key(libsoftbus, C.int(key))
    return int(r)
}
// RemoveKeys rm key
func (d *DgramSocket) RemoveKeys(keys []int) int {
    if libsoftbus == nil {
        return RETVAL
    }
    l := len(keys)
    ca := C.get_array(C.int(l))
    defer C.free(ca)
    for k, v := range keys {
        C.set_array(ca, C.int(k), C.int(v))
    }
    r := C.wrap_fn_dgram_remove_keys(libsoftbus, ca, C.int(l))
    return int(r)
}