zhangmeng
2020-08-03 0bde715af72b7b3d55ad3aac816d7cd153a60b42
add timeout api
5个文件已修改
316 ■■■■■ 已修改文件
libcsoftbus.c 81 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
libcsoftbus.h 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
libcsoftbus_func.h 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
library.go 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
softbusDgram.go 158 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
libcsoftbus.c
@@ -308,6 +308,22 @@
    return fn_dgram_socket_sendto(s, buf, size, port);
}
int wrap_fn_dgram_socket_sendto_timeout(hcsoftbus lib, void *s, const void *buf, const int size, const int port, int sec, int usec){
    if (!fn_dgram_socket_sendto_timeout){
        fn_dgram_socket_sendto_timeout = (tfn_dgram_socket_sendto_timeout)dlsym(lib, l_dgram_socket_sendto_timeout);
        check_with_ret(fn_dgram_socket_sendto_timeout, lib, -1);
    }
    return fn_dgram_socket_sendto_timeout(s, buf, size, port, sec, usec);
}
int wrap_fn_dgram_socket_sendto_nowait(hcsoftbus lib, void *s, const void *buf, const int size, const int port){
    if (!fn_dgram_socket_sendto_nowait){
        fn_dgram_socket_sendto_nowait = (tfn_dgram_socket_sendto_nowait)dlsym(lib, l_dgram_socket_sendto_nowait);
        check_with_ret(fn_dgram_socket_sendto_nowait, lib, -1);
    }
    return fn_dgram_socket_sendto_nowait(s, buf, size, port);
}
int wrap_fn_dgram_socket_recvfrom(hcsoftbus lib, void *s, void **buf, int *size, int *port){
    if (!fn_dgram_socket_recvfrom){
        fn_dgram_socket_recvfrom = (tfn_dgram_socket_recvfrom)dlsym(lib, l_dgram_socket_recvfrom);
@@ -316,12 +332,45 @@
    return fn_dgram_socket_recvfrom(s, buf, size, port);
}
int wrap_fn_dgram_socket_recvfrom_timeout(hcsoftbus lib, void *s, void **buf, int *size, int *port, int sec, int usec){
    if (!fn_dgram_socket_recvfrom_timeout){
        fn_dgram_socket_recvfrom_timeout = (tfn_dgram_socket_recvfrom_timeout)dlsym(lib, l_dgram_socket_recvfrom_timeout);
        check_with_ret(fn_dgram_socket_recvfrom_timeout, lib, -1);
    }
    return fn_dgram_socket_recvfrom_timeout(s, buf, size, port, sec, usec);
}
int wrap_fn_dgram_socket_recvfrom_nowait(hcsoftbus lib, void *s, void **buf, int *size, int *port){
    if (!fn_dgram_socket_recvfrom_nowait){
        fn_dgram_socket_recvfrom_nowait = (tfn_dgram_socket_recvfrom_nowait)dlsym(lib, l_dgram_socket_recvfrom_nowait);
        check_with_ret(fn_dgram_socket_recvfrom_nowait, lib, -1);
    }
    return fn_dgram_socket_recvfrom_nowait(s, buf, size, port);
}
int wrap_fn_dgram_socket_sendandrecv(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize){
    if (!fn_dgram_socket_sendandrecv){
        fn_dgram_socket_sendandrecv = (tfn_dgram_socket_sendandrecv)dlsym(lib, l_dgram_socket_sendandrecv);
        check_with_ret(fn_dgram_socket_sendandrecv, lib, -1);
    }
    return fn_dgram_socket_sendandrecv(s, sbuf, ssize, port, rbuf, rsize);
}
int wrap_fn_dgram_socket_sendandrecv_timeout(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize, int sec, int usec){
    if (!fn_dgram_socket_sendandrecv_timeout){
        fn_dgram_socket_sendandrecv_timeout = (tfn_dgram_socket_sendandrecv_timeout)dlsym(lib, l_dgram_socket_sendandrecv_timeout);
        check_with_ret(fn_dgram_socket_sendandrecv_timeout, lib, -1);
    }
    return fn_dgram_socket_sendandrecv_timeout(s, sbuf, ssize, port, rbuf, rsize, sec, usec);
}
int wrap_fn_dgram_socket_sendandrecv_nowait(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize){
    if (!fn_dgram_socket_sendandrecv_nowait){
        fn_dgram_socket_sendandrecv_nowait = (tfn_dgram_socket_sendandrecv_nowait)dlsym(lib, l_dgram_socket_sendandrecv_nowait);
        check_with_ret(fn_dgram_socket_sendandrecv_nowait, lib, -1);
    }
    return fn_dgram_socket_sendandrecv_nowait(s, sbuf, ssize, port, rbuf, rsize);
}
int wrap_fn_dgram_socket_start_bus(hcsoftbus lib, void *s){
@@ -340,6 +389,22 @@
    return fn_dgram_socket_sub(s, topic, size, port);
}
int wrap_fn_dgram_socket_sub_timeout(hcsoftbus lib, void *s, void *topic, int size, int port, int sec, int usec){
    if (!fn_dgram_socket_sub_timeout){
        fn_dgram_socket_sub_timeout = (tfn_dgram_socket_sub_timeout)dlsym(lib, l_dgram_socket_sub_timeout);
        check_with_ret(fn_dgram_socket_sub_timeout, lib, -1);
    }
    return fn_dgram_socket_sub_timeout(s, topic, size, port, sec, usec);
}
int wrap_fn_dgram_socket_sub_nowait(hcsoftbus lib, void *s, void *topic, int size, int port){
    if (!fn_dgram_socket_sub_nowait){
        fn_dgram_socket_sub_nowait = (tfn_dgram_socket_sub_nowait)dlsym(lib, l_dgram_socket_sub_nowait);
        check_with_ret(fn_dgram_socket_sub_nowait, lib, -1);
    }
    return fn_dgram_socket_sub_nowait(s, topic, size, port);
}
int wrap_fn_dgram_socket_pub(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port){
    if (!fn_dgram_socket_pub){
        fn_dgram_socket_pub = (tfn_dgram_socket_pub)dlsym(lib, l_dgram_socket_pub);
@@ -348,6 +413,22 @@
    return fn_dgram_socket_pub(s, topic, tsize, content, csize, port);
}
int wrap_fn_dgram_socket_pub_timeout(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port, int sec, int usec){
    if (!fn_dgram_socket_pub_timeout){
        fn_dgram_socket_pub_timeout = (tfn_dgram_socket_pub_timeout)dlsym(lib, l_dgram_socket_pub_timeout);
        check_with_ret(fn_dgram_socket_pub_timeout, lib, -1);
    }
    return fn_dgram_socket_pub_timeout(s, topic, tsize, content, csize, port, sec, usec);
}
int wrap_fn_dgram_socket_pub_nowait(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port){
    if (!fn_dgram_socket_pub_nowait){
        fn_dgram_socket_pub_nowait = (tfn_dgram_socket_pub_nowait)dlsym(lib, l_dgram_socket_pub_nowait);
        check_with_ret(fn_dgram_socket_pub_nowait, lib, -1);
    }
    return fn_dgram_socket_pub_nowait(s, topic, tsize, content, csize, port);
}
int wrap_fn_dgram_socket_port(hcsoftbus lib, void *s){
    if (!fn_dgram_socket_port){
        fn_dgram_socket_port = (tfn_dgram_socket_port)dlsym(lib, l_dgram_socket_port);
libcsoftbus.h
@@ -52,11 +52,21 @@
static tfn_dgram_socket_bind    fn_dgram_socket_bind = NULL;
static tfn_dgram_socket_force_bind    fn_dgram_socket_force_bind = NULL;
static tfn_dgram_socket_sendto  fn_dgram_socket_sendto = NULL;
static tfn_dgram_socket_sendto_timeout  fn_dgram_socket_sendto_timeout = NULL;
static tfn_dgram_socket_sendto_nowait  fn_dgram_socket_sendto_nowait = NULL;
static tfn_dgram_socket_recvfrom      fn_dgram_socket_recvfrom = NULL;
static tfn_dgram_socket_recvfrom_timeout      fn_dgram_socket_recvfrom_timeout = NULL;
static tfn_dgram_socket_recvfrom_nowait      fn_dgram_socket_recvfrom_nowait = NULL;
static tfn_dgram_socket_sendandrecv   fn_dgram_socket_sendandrecv = NULL;
static tfn_dgram_socket_sendandrecv_timeout   fn_dgram_socket_sendandrecv_timeout = NULL;
static tfn_dgram_socket_sendandrecv_nowait   fn_dgram_socket_sendandrecv_nowait = NULL;
static tfn_dgram_socket_start_bus     fn_dgram_socket_start_bus = NULL;
static tfn_dgram_socket_sub     fn_dgram_socket_sub = NULL;
static tfn_dgram_socket_sub_timeout     fn_dgram_socket_sub_timeout = NULL;
static tfn_dgram_socket_sub_nowait     fn_dgram_socket_sub_nowait = NULL;
static tfn_dgram_socket_pub     fn_dgram_socket_pub = NULL;
static tfn_dgram_socket_pub_timeout     fn_dgram_socket_pub_timeout = NULL;
static tfn_dgram_socket_pub_nowait     fn_dgram_socket_pub_nowait = NULL;
static tfn_dgram_socket_port    fn_dgram_socket_port = NULL;
static tfn_dgram_socket_free    fn_dgram_socket_free = NULL;
@@ -107,11 +117,21 @@
const static char l_dgram_socket_bind[] = "dgram_mod_bind";
const static char l_dgram_socket_force_bind[] = "dgram_mod_force_bind";
const static char l_dgram_socket_sendto[] = "dgram_mod_sendto";
const static char l_dgram_socket_sendto_timeout[] = "dgram_mod_sendto_timeout";
const static char l_dgram_socket_sendto_nowait[] = "dgram_mod_sendto_nowait";
const static char l_dgram_socket_recvfrom[] = "dgram_mod_recvfrom";
const static char l_dgram_socket_recvfrom_timeout[] = "dgram_mod_recvfrom_timeout";
const static char l_dgram_socket_recvfrom_nowait[] = "dgram_mod_recvfrom_nowait";
const static char l_dgram_socket_sendandrecv[] = "dgram_mod_sendandrecv";
const static char l_dgram_socket_sendandrecv_timeout[] = "dgram_mod_sendandrecv_timeout";
const static char l_dgram_socket_sendandrecv_nowait[] = "dgram_mod_sendandrecv_nowait";
const static char l_dgram_socket_start_bus[] = "dgram_mod_start_bus";
const static char l_dgram_socket_sub[] = "dgram_mod_sub";
const static char l_dgram_socket_sub_timeout[] = "dgram_mod_sub_timeout";
const static char l_dgram_socket_sub_nowait[] = "dgram_mod_sub_nowait";
const static char l_dgram_socket_pub[] = "dgram_mod_pub";
const static char l_dgram_socket_pub_timeout[] = "dgram_mod_pub_timeout";
const static char l_dgram_socket_pub_nowait[] = "dgram_mod_pub_nowait";
const static char l_dgram_socket_port[] = "dgram_mod_get_port";
const static char l_dgram_socket_free[] = "dgram_mod_free";
@@ -168,11 +188,21 @@
int wrap_fn_dgram_socket_bind(hcsoftbus lib, void *s, int port);
int wrap_fn_dgram_socket_force_bind(hcsoftbus lib, void *s, int port);
int wrap_fn_dgram_socket_sendto(hcsoftbus lib, void *s, const void *buf, const int size, const int port);
int wrap_fn_dgram_socket_sendto_timeout(hcsoftbus lib, void *s, const void *buf, const int size, const int port, int sec, int usec);
int wrap_fn_dgram_socket_sendto_nowait(hcsoftbus lib, void *s, const void *buf, const int size, const int port);
int wrap_fn_dgram_socket_recvfrom(hcsoftbus lib, void *s, void **buf, int *size, int *port);
int wrap_fn_dgram_socket_recvfrom_timeout(hcsoftbus lib, void *s, void **buf, int *size, int *port, int sec, int usec);
int wrap_fn_dgram_socket_recvfrom_nowait(hcsoftbus lib, void *s, void **buf, int *size, int *port);
int wrap_fn_dgram_socket_sendandrecv(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize);
int wrap_fn_dgram_socket_sendandrecv_timeout(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize, int sec, int usec);
int wrap_fn_dgram_socket_sendandrecv_nowait(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize);
int wrap_fn_dgram_socket_start_bus(hcsoftbus lib, void *s);
int wrap_fn_dgram_socket_sub(hcsoftbus lib, void *s, void *topic, int size, int port);
int wrap_fn_dgram_socket_sub_timeout(hcsoftbus lib, void *s, void *topic, int size, int port, int sec, int usec);
int wrap_fn_dgram_socket_sub_nowait(hcsoftbus lib, void *s, void *topic, int size, int port);
int wrap_fn_dgram_socket_pub(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port);
int wrap_fn_dgram_socket_pub_timeout(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port, int sec, int usec);
int wrap_fn_dgram_socket_pub_nowait(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port);
int wrap_fn_dgram_socket_port(hcsoftbus lib, void *s);
void wrap_fn_dgram_socket_free(hcsoftbus lib, void *buf);
libcsoftbus_func.h
@@ -175,18 +175,26 @@
 * @return 0 成功, 其他值 失败的错误码
 */
typedef int(*tfn_dgram_socket_sendto) (void*, const void*, const int, const int);
// 发送信息超时返回。 @sec 秒 , @nsec 纳秒
typedef int(*tfn_dgram_socket_sendto_timeout) (void*, const void*, const int, const int, int, int);
// 发送信息立刻返回。
typedef tfn_dgram_socket_sendto tfn_dgram_socket_sendto_nowait;
/**
 * 接收信息
 * @port 从谁哪里收到的信息
 * @return 0 成功, 其他值 失败的错误码
*/
typedef int(*tfn_dgram_socket_recvfrom) (void*, void**, int*, int*);
typedef int(*tfn_dgram_socket_recvfrom_timeout) (void*, void**, int*, int*, int, int);
typedef tfn_dgram_socket_recvfrom tfn_dgram_socket_recvfrom_nowait;
/**
 * 发送请求信息并等待接收应答
 * @port 发送给谁
 * @return 0 成功, 其他值 失败的错误码
*/
typedef int(*tfn_dgram_socket_sendandrecv) (void*, const void*, const int, const int, void**, int*);
typedef int(*tfn_dgram_socket_sendandrecv_timeout) (void*, const void*, const int, const int, void**, int*, int, int);
typedef tfn_dgram_socket_sendandrecv tfn_dgram_socket_sendandrecv_nowait;
/**
 * 启动bus
 *
@@ -200,6 +208,8 @@
 * @port 总线端口
 */
typedef int(*tfn_dgram_socket_sub) (void*, void*, int, int);
typedef int(*tfn_dgram_socket_sub_timeout) (void*, void*, int, int, int, int);
typedef tfn_dgram_socket_sub tfn_dgram_socket_sub_nowait;
/**
 * 发布主题
 * @topic 主题
@@ -207,6 +217,8 @@
 * @port 总线端口
 */
typedef int(*tfn_dgram_socket_pub) (void*, void*, int, void*, int, int);
typedef int(*tfn_dgram_socket_pub_timeout) (void*, void*, int, void*, int, int, int, int);
typedef tfn_dgram_socket_pub tfn_dgram_socket_pub_nowait;
/**
 * 获取soket端口号
 */
library.go
@@ -85,14 +85,16 @@
        case <-ctx.Done():
            return
        default:
            if data, peer, err := sock.RecvFrom(); err == nil {
                var info *MsgInfo
                if err := proto.Unmarshal(data, info); err == nil {
            if data, peer, err := sock.RecvFromTimeout(0, 10*1000); err == nil {
                var info MsgInfo
                if err := proto.Unmarshal(data, &info); err == nil {
                    ch <- TransInfo{
                        info: info,
                        info: &info,
                        port: peer,
                    }
                }
            } else {
                // time.Sleep(10 * time.Millisecond)
            }
        }
    }
@@ -211,6 +213,11 @@
    return handle
}
const (
    timeoutSec  = 1
    timeoutUsec = 0
)
// GetTopicInfo get topic info
func (h *Handle) GetTopicInfo(topic, typ string) int {
    // 据说不更新,先用缓存,否则需要新创建一个socket,来从manager请求key
@@ -225,10 +232,10 @@
    }
    if data, err := proto.Marshal(msg); err == nil {
        h.mtxWorker.Lock()
        if rdata, err := h.sockWorker.sock.SendAndRecv(data, h.sockWorker.peer); err == nil {
        if rdata, err := h.sockWorker.sock.SendAndRecvTimeout(data, h.sockWorker.peer, timeoutSec, timeoutUsec); err == nil {
            h.mtxWorker.Unlock()
            var rmsg *TopicInfoReply
            if err := proto.Unmarshal(rdata, rmsg); err == nil {
            var rmsg TopicInfoReply
            if err := proto.Unmarshal(rdata, &rmsg); err == nil {
                return int(rmsg.Key)
            }
        }
@@ -238,7 +245,7 @@
}
func (h *Handle) send2(sc *sockClient, data []byte, logID string) error {
    if r := sc.sock.SendTo(data, sc.peer); r != 0 {
    if r := sc.sock.SendToTimeout(data, sc.peer, timeoutSec, timeoutUsec); r != 0 {
        return fmt.Errorf("%s SendTo Failed: %d", logID, r)
    }
    return nil
@@ -259,7 +266,7 @@
    defer h.mtxWorker.Unlock()
    msg, err := proto.Marshal(info)
    if err == nil {
        if r := h.sockWorker.sock.SendTo(msg, key); r != 0 {
        if r := h.sockWorker.sock.SendToTimeout(msg, key, timeoutSec, timeoutUsec); r != 0 {
            return fmt.Errorf("SendOnly Failed: %d", r)
        }
    }
@@ -286,28 +293,28 @@
    }
    // 同步接口,需要等待返回值
    var ret *MsgInfo
    var ret MsgInfo
loop:
    for {
        select {
        case <-h.ctx.Done():
            return nil
        default:
            if data, err := h.sockWorker.sock.SendAndRecv(msg, key); err == nil {
                if err := proto.Unmarshal(data, ret); err == nil {
            if data, err := h.sockWorker.sock.SendAndRecvTimeout(msg, key, timeoutSec, timeoutUsec); err == nil {
                if err := proto.Unmarshal(data, &ret); err == nil {
                    break loop
                }
            }
        }
    }
    return ret
    return &ret
}
// Reply request
func (h *Handle) Reply(key int, info *MsgInfo) error {
    msg, err := proto.Marshal(info)
    if err == nil {
        if r := h.sockRep.sock.SendTo(msg, key); r != 0 {
        if r := h.sockRep.sock.SendToTimeout(msg, key, timeoutSec, timeoutUsec); r != 0 {
            return fmt.Errorf("Reply Failed: %d", r)
        }
    }
softbusDgram.go
@@ -71,6 +71,26 @@
    return int(r)
}
// SendToTimeout port
func (d *DgramSocket) SendToTimeout(data []byte, port int, sec, usec int) int {
    if libsoftbus == nil {
        return RETVAL
    }
    r := C.wrap_fn_dgram_socket_sendto_timeout(libsoftbus, d.dgram, unsafe.Pointer(&data[0]), C.int(len(data)), C.int(port), C.int(sec), C.int(usec))
    return int(r)
}
// SendToNoWait port
func (d *DgramSocket) SendToNoWait(data []byte, port int) int {
    if libsoftbus == nil {
        return RETVAL
    }
    r := C.wrap_fn_dgram_socket_sendto_nowait(libsoftbus, d.dgram, unsafe.Pointer(&data[0]), C.int(len(data)), C.int(port))
    return int(r)
}
// RecvFrom data and port
func (d *DgramSocket) RecvFrom() ([]byte, int, error) {
    if libsoftbus == nil {
@@ -92,6 +112,48 @@
    return data, int(rp), nil
}
// RecvFromTimeout data and port
func (d *DgramSocket) RecvFromTimeout(sec, usec int) ([]byte, int, error) {
    if libsoftbus == nil {
        return nil, -1, fmt.Errorf("RecvFromTimeout Func Test libsoftbus Is Nil")
    }
    var rb unsafe.Pointer
    var rs C.int
    var rp C.int
    r := C.wrap_fn_dgram_socket_recvfrom_timeout(libsoftbus, d.dgram, &rb, &rs, &rp, C.int(sec), C.int(usec))
    if r != 0 {
        return nil, int(rp), fmt.Errorf("RecvFromTimeout Func Failed %d", int(r))
    }
    data := C.GoBytes(rb, rs)
    C.wrap_fn_dgram_socket_free(libsoftbus, rb)
    return data, int(rp), nil
}
// RecvFromNoWait data and port
func (d *DgramSocket) RecvFromNoWait() ([]byte, int, error) {
    if libsoftbus == nil {
        return nil, -1, fmt.Errorf("RecvFromNoWait Func Test libsoftbus Is Nil")
    }
    var rb unsafe.Pointer
    var rs C.int
    var rp C.int
    r := C.wrap_fn_dgram_socket_recvfrom_nowait(libsoftbus, d.dgram, &rb, &rs, &rp)
    if r != 0 {
        return nil, int(rp), fmt.Errorf("RecvFromNoWait Func Failed %d", int(r))
    }
    data := C.GoBytes(rb, rs)
    C.wrap_fn_dgram_socket_free(libsoftbus, rb)
    return data, int(rp), nil
}
// SendAndRecv sync
func (d *DgramSocket) SendAndRecv(sdata []byte, port int) ([]byte, error) {
    if libsoftbus == nil {
@@ -104,6 +166,46 @@
    r := C.wrap_fn_dgram_socket_sendandrecv(libsoftbus, d.dgram, unsafe.Pointer(&sdata[0]), C.int(len(sdata)), C.int(port), &rb, &rs)
    if r != 0 {
        return nil, fmt.Errorf("SendAndRecv Send To %d And Recv From It Failed", port)
    }
    data := C.GoBytes(rb, rs)
    C.wrap_fn_dgram_socket_free(libsoftbus, rb)
    return data, nil
}
// SendAndRecvTimeout sync
func (d *DgramSocket) SendAndRecvTimeout(sdata []byte, port int, sec, usec int) ([]byte, error) {
    if libsoftbus == nil {
        return nil, fmt.Errorf("SendAndRecvTimeout Func Test libsoftbus Is Nil")
    }
    var rb unsafe.Pointer
    var rs C.int
    r := C.wrap_fn_dgram_socket_sendandrecv_timeout(libsoftbus, d.dgram, unsafe.Pointer(&sdata[0]), C.int(len(sdata)), C.int(port), &rb, &rs, C.int(sec), C.int(usec))
    if r != 0 {
        return nil, fmt.Errorf("SendAndRecvTimeout Send To %d And Recv From It Failed", port)
    }
    data := C.GoBytes(rb, rs)
    C.wrap_fn_dgram_socket_free(libsoftbus, rb)
    return data, nil
}
// SendAndRecvNoWait sync
func (d *DgramSocket) SendAndRecvNoWait(sdata []byte, port int) ([]byte, error) {
    if libsoftbus == nil {
        return nil, fmt.Errorf("SendAndRecvNoWait Func Test libsoftbus Is Nil")
    }
    var rb unsafe.Pointer
    var rs C.int
    r := C.wrap_fn_dgram_socket_sendandrecv_nowait(libsoftbus, d.dgram, unsafe.Pointer(&sdata[0]), C.int(len(sdata)), C.int(port), &rb, &rs)
    if r != 0 {
        return nil, fmt.Errorf("SendAndRecvNoWait Send To %d And Recv From It Failed", port)
    }
    data := C.GoBytes(rb, rs)
@@ -135,6 +237,32 @@
    return int(r)
}
// SubTimeout sub bus
func (d *DgramSocket) SubTimeout(topic string, port int, sec, usec int) int {
    if libsoftbus == nil {
        return RETVAL
    }
    ct := C.CString(topic)
    defer C.free(unsafe.Pointer(ct))
    r := C.wrap_fn_dgram_socket_sub_timeout(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), C.int(port), C.int(sec), C.int(usec))
    return int(r)
}
// SubNoWait sub bus
func (d *DgramSocket) SubNoWait(topic string, port int) int {
    if libsoftbus == nil {
        return RETVAL
    }
    ct := C.CString(topic)
    defer C.free(unsafe.Pointer(ct))
    r := C.wrap_fn_dgram_socket_sub_nowait(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), C.int(port))
    return int(r)
}
// Pub bus
func (d *DgramSocket) Pub(topic, msg string, port int) int {
    if libsoftbus == nil {
@@ -150,6 +278,36 @@
    return int(r)
}
// PubTimeout bus
func (d *DgramSocket) PubTimeout(topic, msg string, port int, sec, usec int) int {
    if libsoftbus == nil {
        return RETVAL
    }
    ct := C.CString(topic)
    defer C.free(unsafe.Pointer(ct))
    cm := C.CString(msg)
    defer C.free(unsafe.Pointer(cm))
    r := C.wrap_fn_dgram_socket_pub_timeout(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), unsafe.Pointer(cm), C.int(len(msg)), C.int(port), C.int(sec), C.int(usec))
    return int(r)
}
// PubNoWait bus
func (d *DgramSocket) PubNoWait(topic, msg string, port int) int {
    if libsoftbus == nil {
        return RETVAL
    }
    ct := C.CString(topic)
    defer C.free(unsafe.Pointer(ct))
    cm := C.CString(msg)
    defer C.free(unsafe.Pointer(cm))
    r := C.wrap_fn_dgram_socket_pub_nowait(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), unsafe.Pointer(cm), C.int(len(msg)), C.int(port))
    return int(r)
}
// Port socket
func (d *DgramSocket) Port() int {
    if libsoftbus == nil {