From 0bde715af72b7b3d55ad3aac816d7cd153a60b42 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期一, 03 八月 2020 13:29:21 +0800 Subject: [PATCH] add timeout api --- library.go | 35 +++-- libcsoftbus.h | 30 +++++ libcsoftbus_func.h | 12 ++ softbusDgram.go | 158 ++++++++++++++++++++++++++ libcsoftbus.c | 81 +++++++++++++ 5 files changed, 302 insertions(+), 14 deletions(-) diff --git a/libcsoftbus.c b/libcsoftbus.c index a5a23c1..11fc582 100644 --- a/libcsoftbus.c +++ b/libcsoftbus.c @@ -308,6 +308,22 @@ return fn_dgram_socket_sendto(s, buf, size, port); } +int wrap_fn_dgram_socket_sendto_timeout(hcsoftbus lib, void *s, const void *buf, const int size, const int port, int sec, int usec){ + if (!fn_dgram_socket_sendto_timeout){ + fn_dgram_socket_sendto_timeout = (tfn_dgram_socket_sendto_timeout)dlsym(lib, l_dgram_socket_sendto_timeout); + check_with_ret(fn_dgram_socket_sendto_timeout, lib, -1); + } + return fn_dgram_socket_sendto_timeout(s, buf, size, port, sec, usec); +} + +int wrap_fn_dgram_socket_sendto_nowait(hcsoftbus lib, void *s, const void *buf, const int size, const int port){ + if (!fn_dgram_socket_sendto_nowait){ + fn_dgram_socket_sendto_nowait = (tfn_dgram_socket_sendto_nowait)dlsym(lib, l_dgram_socket_sendto_nowait); + check_with_ret(fn_dgram_socket_sendto_nowait, lib, -1); + } + return fn_dgram_socket_sendto_nowait(s, buf, size, port); +} + int wrap_fn_dgram_socket_recvfrom(hcsoftbus lib, void *s, void **buf, int *size, int *port){ if (!fn_dgram_socket_recvfrom){ fn_dgram_socket_recvfrom = (tfn_dgram_socket_recvfrom)dlsym(lib, l_dgram_socket_recvfrom); @@ -316,12 +332,45 @@ return fn_dgram_socket_recvfrom(s, buf, size, port); } +int wrap_fn_dgram_socket_recvfrom_timeout(hcsoftbus lib, void *s, void **buf, int *size, int *port, int sec, int usec){ + if (!fn_dgram_socket_recvfrom_timeout){ + fn_dgram_socket_recvfrom_timeout = (tfn_dgram_socket_recvfrom_timeout)dlsym(lib, l_dgram_socket_recvfrom_timeout); + check_with_ret(fn_dgram_socket_recvfrom_timeout, lib, -1); + } + return fn_dgram_socket_recvfrom_timeout(s, buf, size, port, sec, usec); +} + +int wrap_fn_dgram_socket_recvfrom_nowait(hcsoftbus lib, void *s, void **buf, int *size, int *port){ + if (!fn_dgram_socket_recvfrom_nowait){ + fn_dgram_socket_recvfrom_nowait = (tfn_dgram_socket_recvfrom_nowait)dlsym(lib, l_dgram_socket_recvfrom_nowait); + check_with_ret(fn_dgram_socket_recvfrom_nowait, lib, -1); + } + return fn_dgram_socket_recvfrom_nowait(s, buf, size, port); +} + int wrap_fn_dgram_socket_sendandrecv(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize){ if (!fn_dgram_socket_sendandrecv){ fn_dgram_socket_sendandrecv = (tfn_dgram_socket_sendandrecv)dlsym(lib, l_dgram_socket_sendandrecv); check_with_ret(fn_dgram_socket_sendandrecv, lib, -1); } return fn_dgram_socket_sendandrecv(s, sbuf, ssize, port, rbuf, rsize); +} + +int wrap_fn_dgram_socket_sendandrecv_timeout(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize, int sec, int usec){ + if (!fn_dgram_socket_sendandrecv_timeout){ + fn_dgram_socket_sendandrecv_timeout = (tfn_dgram_socket_sendandrecv_timeout)dlsym(lib, l_dgram_socket_sendandrecv_timeout); + check_with_ret(fn_dgram_socket_sendandrecv_timeout, lib, -1); + } + + return fn_dgram_socket_sendandrecv_timeout(s, sbuf, ssize, port, rbuf, rsize, sec, usec); +} + +int wrap_fn_dgram_socket_sendandrecv_nowait(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize){ + if (!fn_dgram_socket_sendandrecv_nowait){ + fn_dgram_socket_sendandrecv_nowait = (tfn_dgram_socket_sendandrecv_nowait)dlsym(lib, l_dgram_socket_sendandrecv_nowait); + check_with_ret(fn_dgram_socket_sendandrecv_nowait, lib, -1); + } + return fn_dgram_socket_sendandrecv_nowait(s, sbuf, ssize, port, rbuf, rsize); } int wrap_fn_dgram_socket_start_bus(hcsoftbus lib, void *s){ @@ -340,6 +389,22 @@ return fn_dgram_socket_sub(s, topic, size, port); } +int wrap_fn_dgram_socket_sub_timeout(hcsoftbus lib, void *s, void *topic, int size, int port, int sec, int usec){ + if (!fn_dgram_socket_sub_timeout){ + fn_dgram_socket_sub_timeout = (tfn_dgram_socket_sub_timeout)dlsym(lib, l_dgram_socket_sub_timeout); + check_with_ret(fn_dgram_socket_sub_timeout, lib, -1); + } + return fn_dgram_socket_sub_timeout(s, topic, size, port, sec, usec); +} + +int wrap_fn_dgram_socket_sub_nowait(hcsoftbus lib, void *s, void *topic, int size, int port){ + if (!fn_dgram_socket_sub_nowait){ + fn_dgram_socket_sub_nowait = (tfn_dgram_socket_sub_nowait)dlsym(lib, l_dgram_socket_sub_nowait); + check_with_ret(fn_dgram_socket_sub_nowait, lib, -1); + } + return fn_dgram_socket_sub_nowait(s, topic, size, port); +} + int wrap_fn_dgram_socket_pub(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port){ if (!fn_dgram_socket_pub){ fn_dgram_socket_pub = (tfn_dgram_socket_pub)dlsym(lib, l_dgram_socket_pub); @@ -348,6 +413,22 @@ return fn_dgram_socket_pub(s, topic, tsize, content, csize, port); } +int wrap_fn_dgram_socket_pub_timeout(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port, int sec, int usec){ + if (!fn_dgram_socket_pub_timeout){ + fn_dgram_socket_pub_timeout = (tfn_dgram_socket_pub_timeout)dlsym(lib, l_dgram_socket_pub_timeout); + check_with_ret(fn_dgram_socket_pub_timeout, lib, -1); + } + return fn_dgram_socket_pub_timeout(s, topic, tsize, content, csize, port, sec, usec); +} + +int wrap_fn_dgram_socket_pub_nowait(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port){ + if (!fn_dgram_socket_pub_nowait){ + fn_dgram_socket_pub_nowait = (tfn_dgram_socket_pub_nowait)dlsym(lib, l_dgram_socket_pub_nowait); + check_with_ret(fn_dgram_socket_pub_nowait, lib, -1); + } + return fn_dgram_socket_pub_nowait(s, topic, tsize, content, csize, port); +} + int wrap_fn_dgram_socket_port(hcsoftbus lib, void *s){ if (!fn_dgram_socket_port){ fn_dgram_socket_port = (tfn_dgram_socket_port)dlsym(lib, l_dgram_socket_port); diff --git a/libcsoftbus.h b/libcsoftbus.h index 1885b61..f141f7a 100644 --- a/libcsoftbus.h +++ b/libcsoftbus.h @@ -52,11 +52,21 @@ static tfn_dgram_socket_bind fn_dgram_socket_bind = NULL; static tfn_dgram_socket_force_bind fn_dgram_socket_force_bind = NULL; static tfn_dgram_socket_sendto fn_dgram_socket_sendto = NULL; +static tfn_dgram_socket_sendto_timeout fn_dgram_socket_sendto_timeout = NULL; +static tfn_dgram_socket_sendto_nowait fn_dgram_socket_sendto_nowait = NULL; static tfn_dgram_socket_recvfrom fn_dgram_socket_recvfrom = NULL; +static tfn_dgram_socket_recvfrom_timeout fn_dgram_socket_recvfrom_timeout = NULL; +static tfn_dgram_socket_recvfrom_nowait fn_dgram_socket_recvfrom_nowait = NULL; static tfn_dgram_socket_sendandrecv fn_dgram_socket_sendandrecv = NULL; +static tfn_dgram_socket_sendandrecv_timeout fn_dgram_socket_sendandrecv_timeout = NULL; +static tfn_dgram_socket_sendandrecv_nowait fn_dgram_socket_sendandrecv_nowait = NULL; static tfn_dgram_socket_start_bus fn_dgram_socket_start_bus = NULL; static tfn_dgram_socket_sub fn_dgram_socket_sub = NULL; +static tfn_dgram_socket_sub_timeout fn_dgram_socket_sub_timeout = NULL; +static tfn_dgram_socket_sub_nowait fn_dgram_socket_sub_nowait = NULL; static tfn_dgram_socket_pub fn_dgram_socket_pub = NULL; +static tfn_dgram_socket_pub_timeout fn_dgram_socket_pub_timeout = NULL; +static tfn_dgram_socket_pub_nowait fn_dgram_socket_pub_nowait = NULL; static tfn_dgram_socket_port fn_dgram_socket_port = NULL; static tfn_dgram_socket_free fn_dgram_socket_free = NULL; @@ -107,11 +117,21 @@ const static char l_dgram_socket_bind[] = "dgram_mod_bind"; const static char l_dgram_socket_force_bind[] = "dgram_mod_force_bind"; const static char l_dgram_socket_sendto[] = "dgram_mod_sendto"; +const static char l_dgram_socket_sendto_timeout[] = "dgram_mod_sendto_timeout"; +const static char l_dgram_socket_sendto_nowait[] = "dgram_mod_sendto_nowait"; const static char l_dgram_socket_recvfrom[] = "dgram_mod_recvfrom"; +const static char l_dgram_socket_recvfrom_timeout[] = "dgram_mod_recvfrom_timeout"; +const static char l_dgram_socket_recvfrom_nowait[] = "dgram_mod_recvfrom_nowait"; const static char l_dgram_socket_sendandrecv[] = "dgram_mod_sendandrecv"; +const static char l_dgram_socket_sendandrecv_timeout[] = "dgram_mod_sendandrecv_timeout"; +const static char l_dgram_socket_sendandrecv_nowait[] = "dgram_mod_sendandrecv_nowait"; const static char l_dgram_socket_start_bus[] = "dgram_mod_start_bus"; const static char l_dgram_socket_sub[] = "dgram_mod_sub"; +const static char l_dgram_socket_sub_timeout[] = "dgram_mod_sub_timeout"; +const static char l_dgram_socket_sub_nowait[] = "dgram_mod_sub_nowait"; const static char l_dgram_socket_pub[] = "dgram_mod_pub"; +const static char l_dgram_socket_pub_timeout[] = "dgram_mod_pub_timeout"; +const static char l_dgram_socket_pub_nowait[] = "dgram_mod_pub_nowait"; const static char l_dgram_socket_port[] = "dgram_mod_get_port"; const static char l_dgram_socket_free[] = "dgram_mod_free"; @@ -168,11 +188,21 @@ int wrap_fn_dgram_socket_bind(hcsoftbus lib, void *s, int port); int wrap_fn_dgram_socket_force_bind(hcsoftbus lib, void *s, int port); int wrap_fn_dgram_socket_sendto(hcsoftbus lib, void *s, const void *buf, const int size, const int port); +int wrap_fn_dgram_socket_sendto_timeout(hcsoftbus lib, void *s, const void *buf, const int size, const int port, int sec, int usec); +int wrap_fn_dgram_socket_sendto_nowait(hcsoftbus lib, void *s, const void *buf, const int size, const int port); int wrap_fn_dgram_socket_recvfrom(hcsoftbus lib, void *s, void **buf, int *size, int *port); +int wrap_fn_dgram_socket_recvfrom_timeout(hcsoftbus lib, void *s, void **buf, int *size, int *port, int sec, int usec); +int wrap_fn_dgram_socket_recvfrom_nowait(hcsoftbus lib, void *s, void **buf, int *size, int *port); int wrap_fn_dgram_socket_sendandrecv(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize); +int wrap_fn_dgram_socket_sendandrecv_timeout(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize, int sec, int usec); +int wrap_fn_dgram_socket_sendandrecv_nowait(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize); int wrap_fn_dgram_socket_start_bus(hcsoftbus lib, void *s); int wrap_fn_dgram_socket_sub(hcsoftbus lib, void *s, void *topic, int size, int port); +int wrap_fn_dgram_socket_sub_timeout(hcsoftbus lib, void *s, void *topic, int size, int port, int sec, int usec); +int wrap_fn_dgram_socket_sub_nowait(hcsoftbus lib, void *s, void *topic, int size, int port); int wrap_fn_dgram_socket_pub(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port); +int wrap_fn_dgram_socket_pub_timeout(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port, int sec, int usec); +int wrap_fn_dgram_socket_pub_nowait(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port); int wrap_fn_dgram_socket_port(hcsoftbus lib, void *s); void wrap_fn_dgram_socket_free(hcsoftbus lib, void *buf); diff --git a/libcsoftbus_func.h b/libcsoftbus_func.h index d921639..6ef5dc7 100644 --- a/libcsoftbus_func.h +++ b/libcsoftbus_func.h @@ -175,18 +175,26 @@ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ typedef int(*tfn_dgram_socket_sendto) (void*, const void*, const int, const int); +// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇 +typedef int(*tfn_dgram_socket_sendto_timeout) (void*, const void*, const int, const int, int, int); +// 鍙戦�佷俊鎭珛鍒昏繑鍥炪�� +typedef tfn_dgram_socket_sendto tfn_dgram_socket_sendto_nowait; /** * 鎺ユ敹淇℃伅 * @port 浠庤皝鍝噷鏀跺埌鐨勪俊鎭� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ typedef int(*tfn_dgram_socket_recvfrom) (void*, void**, int*, int*); +typedef int(*tfn_dgram_socket_recvfrom_timeout) (void*, void**, int*, int*, int, int); +typedef tfn_dgram_socket_recvfrom tfn_dgram_socket_recvfrom_nowait; /** * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟 * @port 鍙戦�佺粰璋� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ typedef int(*tfn_dgram_socket_sendandrecv) (void*, const void*, const int, const int, void**, int*); +typedef int(*tfn_dgram_socket_sendandrecv_timeout) (void*, const void*, const int, const int, void**, int*, int, int); +typedef tfn_dgram_socket_sendandrecv tfn_dgram_socket_sendandrecv_nowait; /** * 鍚姩bus * @@ -200,6 +208,8 @@ * @port 鎬荤嚎绔彛 */ typedef int(*tfn_dgram_socket_sub) (void*, void*, int, int); +typedef int(*tfn_dgram_socket_sub_timeout) (void*, void*, int, int, int, int); +typedef tfn_dgram_socket_sub tfn_dgram_socket_sub_nowait; /** * 鍙戝竷涓婚 * @topic 涓婚 @@ -207,6 +217,8 @@ * @port 鎬荤嚎绔彛 */ typedef int(*tfn_dgram_socket_pub) (void*, void*, int, void*, int, int); +typedef int(*tfn_dgram_socket_pub_timeout) (void*, void*, int, void*, int, int, int, int); +typedef tfn_dgram_socket_pub tfn_dgram_socket_pub_nowait; /** * 鑾峰彇soket绔彛鍙� */ diff --git a/library.go b/library.go index 0d97e0f..491d63d 100644 --- a/library.go +++ b/library.go @@ -85,14 +85,16 @@ case <-ctx.Done(): return default: - if data, peer, err := sock.RecvFrom(); err == nil { - var info *MsgInfo - if err := proto.Unmarshal(data, info); err == nil { + if data, peer, err := sock.RecvFromTimeout(0, 10*1000); err == nil { + var info MsgInfo + if err := proto.Unmarshal(data, &info); err == nil { ch <- TransInfo{ - info: info, + info: &info, port: peer, } } + } else { + // time.Sleep(10 * time.Millisecond) } } } @@ -211,6 +213,11 @@ return handle } +const ( + timeoutSec = 1 + timeoutUsec = 0 +) + // GetTopicInfo get topic info func (h *Handle) GetTopicInfo(topic, typ string) int { // 鎹涓嶆洿鏂�,鍏堢敤缂撳瓨,鍚﹀垯闇�瑕佹柊鍒涘缓涓�涓猻ocket,鏉ヤ粠manager璇锋眰key @@ -225,10 +232,10 @@ } if data, err := proto.Marshal(msg); err == nil { h.mtxWorker.Lock() - if rdata, err := h.sockWorker.sock.SendAndRecv(data, h.sockWorker.peer); err == nil { + if rdata, err := h.sockWorker.sock.SendAndRecvTimeout(data, h.sockWorker.peer, timeoutSec, timeoutUsec); err == nil { h.mtxWorker.Unlock() - var rmsg *TopicInfoReply - if err := proto.Unmarshal(rdata, rmsg); err == nil { + var rmsg TopicInfoReply + if err := proto.Unmarshal(rdata, &rmsg); err == nil { return int(rmsg.Key) } } @@ -238,7 +245,7 @@ } func (h *Handle) send2(sc *sockClient, data []byte, logID string) error { - if r := sc.sock.SendTo(data, sc.peer); r != 0 { + if r := sc.sock.SendToTimeout(data, sc.peer, timeoutSec, timeoutUsec); r != 0 { return fmt.Errorf("%s SendTo Failed: %d", logID, r) } return nil @@ -259,7 +266,7 @@ defer h.mtxWorker.Unlock() msg, err := proto.Marshal(info) if err == nil { - if r := h.sockWorker.sock.SendTo(msg, key); r != 0 { + if r := h.sockWorker.sock.SendToTimeout(msg, key, timeoutSec, timeoutUsec); r != 0 { return fmt.Errorf("SendOnly Failed: %d", r) } } @@ -286,28 +293,28 @@ } // 鍚屾鎺ュ彛,闇�瑕佺瓑寰呰繑鍥炲�� - var ret *MsgInfo + var ret MsgInfo loop: for { select { case <-h.ctx.Done(): return nil default: - if data, err := h.sockWorker.sock.SendAndRecv(msg, key); err == nil { - if err := proto.Unmarshal(data, ret); err == nil { + if data, err := h.sockWorker.sock.SendAndRecvTimeout(msg, key, timeoutSec, timeoutUsec); err == nil { + if err := proto.Unmarshal(data, &ret); err == nil { break loop } } } } - return ret + return &ret } // Reply request func (h *Handle) Reply(key int, info *MsgInfo) error { msg, err := proto.Marshal(info) if err == nil { - if r := h.sockRep.sock.SendTo(msg, key); r != 0 { + if r := h.sockRep.sock.SendToTimeout(msg, key, timeoutSec, timeoutUsec); r != 0 { return fmt.Errorf("Reply Failed: %d", r) } } diff --git a/softbusDgram.go b/softbusDgram.go index f390075..68d0a29 100644 --- a/softbusDgram.go +++ b/softbusDgram.go @@ -71,6 +71,26 @@ return int(r) } +// SendToTimeout port +func (d *DgramSocket) SendToTimeout(data []byte, port int, sec, usec int) int { + if libsoftbus == nil { + return RETVAL + } + + r := C.wrap_fn_dgram_socket_sendto_timeout(libsoftbus, d.dgram, unsafe.Pointer(&data[0]), C.int(len(data)), C.int(port), C.int(sec), C.int(usec)) + return int(r) +} + +// SendToNoWait port +func (d *DgramSocket) SendToNoWait(data []byte, port int) int { + if libsoftbus == nil { + return RETVAL + } + + r := C.wrap_fn_dgram_socket_sendto_nowait(libsoftbus, d.dgram, unsafe.Pointer(&data[0]), C.int(len(data)), C.int(port)) + return int(r) +} + // RecvFrom data and port func (d *DgramSocket) RecvFrom() ([]byte, int, error) { if libsoftbus == nil { @@ -92,6 +112,48 @@ return data, int(rp), nil } +// RecvFromTimeout data and port +func (d *DgramSocket) RecvFromTimeout(sec, usec int) ([]byte, int, error) { + if libsoftbus == nil { + return nil, -1, fmt.Errorf("RecvFromTimeout Func Test libsoftbus Is Nil") + } + + var rb unsafe.Pointer + var rs C.int + var rp C.int + + r := C.wrap_fn_dgram_socket_recvfrom_timeout(libsoftbus, d.dgram, &rb, &rs, &rp, C.int(sec), C.int(usec)) + if r != 0 { + return nil, int(rp), fmt.Errorf("RecvFromTimeout Func Failed %d", int(r)) + } + + data := C.GoBytes(rb, rs) + C.wrap_fn_dgram_socket_free(libsoftbus, rb) + + return data, int(rp), nil +} + +// RecvFromNoWait data and port +func (d *DgramSocket) RecvFromNoWait() ([]byte, int, error) { + if libsoftbus == nil { + return nil, -1, fmt.Errorf("RecvFromNoWait Func Test libsoftbus Is Nil") + } + + var rb unsafe.Pointer + var rs C.int + var rp C.int + + r := C.wrap_fn_dgram_socket_recvfrom_nowait(libsoftbus, d.dgram, &rb, &rs, &rp) + if r != 0 { + return nil, int(rp), fmt.Errorf("RecvFromNoWait Func Failed %d", int(r)) + } + + data := C.GoBytes(rb, rs) + C.wrap_fn_dgram_socket_free(libsoftbus, rb) + + return data, int(rp), nil +} + // SendAndRecv sync func (d *DgramSocket) SendAndRecv(sdata []byte, port int) ([]byte, error) { if libsoftbus == nil { @@ -104,6 +166,46 @@ r := C.wrap_fn_dgram_socket_sendandrecv(libsoftbus, d.dgram, unsafe.Pointer(&sdata[0]), C.int(len(sdata)), C.int(port), &rb, &rs) if r != 0 { return nil, fmt.Errorf("SendAndRecv Send To %d And Recv From It Failed", port) + } + + data := C.GoBytes(rb, rs) + C.wrap_fn_dgram_socket_free(libsoftbus, rb) + + return data, nil +} + +// SendAndRecvTimeout sync +func (d *DgramSocket) SendAndRecvTimeout(sdata []byte, port int, sec, usec int) ([]byte, error) { + if libsoftbus == nil { + return nil, fmt.Errorf("SendAndRecvTimeout Func Test libsoftbus Is Nil") + } + + var rb unsafe.Pointer + var rs C.int + + r := C.wrap_fn_dgram_socket_sendandrecv_timeout(libsoftbus, d.dgram, unsafe.Pointer(&sdata[0]), C.int(len(sdata)), C.int(port), &rb, &rs, C.int(sec), C.int(usec)) + if r != 0 { + return nil, fmt.Errorf("SendAndRecvTimeout Send To %d And Recv From It Failed", port) + } + + data := C.GoBytes(rb, rs) + C.wrap_fn_dgram_socket_free(libsoftbus, rb) + + return data, nil +} + +// SendAndRecvNoWait sync +func (d *DgramSocket) SendAndRecvNoWait(sdata []byte, port int) ([]byte, error) { + if libsoftbus == nil { + return nil, fmt.Errorf("SendAndRecvNoWait Func Test libsoftbus Is Nil") + } + + var rb unsafe.Pointer + var rs C.int + + r := C.wrap_fn_dgram_socket_sendandrecv_nowait(libsoftbus, d.dgram, unsafe.Pointer(&sdata[0]), C.int(len(sdata)), C.int(port), &rb, &rs) + if r != 0 { + return nil, fmt.Errorf("SendAndRecvNoWait Send To %d And Recv From It Failed", port) } data := C.GoBytes(rb, rs) @@ -135,6 +237,32 @@ return int(r) } +// SubTimeout sub bus +func (d *DgramSocket) SubTimeout(topic string, port int, sec, usec int) int { + if libsoftbus == nil { + return RETVAL + } + + ct := C.CString(topic) + defer C.free(unsafe.Pointer(ct)) + + r := C.wrap_fn_dgram_socket_sub_timeout(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), C.int(port), C.int(sec), C.int(usec)) + return int(r) +} + +// SubNoWait sub bus +func (d *DgramSocket) SubNoWait(topic string, port int) int { + if libsoftbus == nil { + return RETVAL + } + + ct := C.CString(topic) + defer C.free(unsafe.Pointer(ct)) + + r := C.wrap_fn_dgram_socket_sub_nowait(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), C.int(port)) + return int(r) +} + // Pub bus func (d *DgramSocket) Pub(topic, msg string, port int) int { if libsoftbus == nil { @@ -150,6 +278,36 @@ return int(r) } +// PubTimeout bus +func (d *DgramSocket) PubTimeout(topic, msg string, port int, sec, usec int) int { + if libsoftbus == nil { + return RETVAL + } + + ct := C.CString(topic) + defer C.free(unsafe.Pointer(ct)) + cm := C.CString(msg) + defer C.free(unsafe.Pointer(cm)) + + r := C.wrap_fn_dgram_socket_pub_timeout(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), unsafe.Pointer(cm), C.int(len(msg)), C.int(port), C.int(sec), C.int(usec)) + return int(r) +} + +// PubNoWait bus +func (d *DgramSocket) PubNoWait(topic, msg string, port int) int { + if libsoftbus == nil { + return RETVAL + } + + ct := C.CString(topic) + defer C.free(unsafe.Pointer(ct)) + cm := C.CString(msg) + defer C.free(unsafe.Pointer(cm)) + + r := C.wrap_fn_dgram_socket_pub_nowait(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), unsafe.Pointer(cm), C.int(len(msg)), C.int(port)) + return int(r) +} + // Port socket func (d *DgramSocket) Port() int { if libsoftbus == nil { -- Gitblit v1.8.0