From 0bde715af72b7b3d55ad3aac816d7cd153a60b42 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 03 八月 2020 13:29:21 +0800
Subject: [PATCH] add timeout api
---
library.go | 35 +++--
libcsoftbus.h | 30 +++++
libcsoftbus_func.h | 12 ++
softbusDgram.go | 158 ++++++++++++++++++++++++++
libcsoftbus.c | 81 +++++++++++++
5 files changed, 302 insertions(+), 14 deletions(-)
diff --git a/libcsoftbus.c b/libcsoftbus.c
index a5a23c1..11fc582 100644
--- a/libcsoftbus.c
+++ b/libcsoftbus.c
@@ -308,6 +308,22 @@
return fn_dgram_socket_sendto(s, buf, size, port);
}
+int wrap_fn_dgram_socket_sendto_timeout(hcsoftbus lib, void *s, const void *buf, const int size, const int port, int sec, int usec){
+ if (!fn_dgram_socket_sendto_timeout){
+ fn_dgram_socket_sendto_timeout = (tfn_dgram_socket_sendto_timeout)dlsym(lib, l_dgram_socket_sendto_timeout);
+ check_with_ret(fn_dgram_socket_sendto_timeout, lib, -1);
+ }
+ return fn_dgram_socket_sendto_timeout(s, buf, size, port, sec, usec);
+}
+
+int wrap_fn_dgram_socket_sendto_nowait(hcsoftbus lib, void *s, const void *buf, const int size, const int port){
+ if (!fn_dgram_socket_sendto_nowait){
+ fn_dgram_socket_sendto_nowait = (tfn_dgram_socket_sendto_nowait)dlsym(lib, l_dgram_socket_sendto_nowait);
+ check_with_ret(fn_dgram_socket_sendto_nowait, lib, -1);
+ }
+ return fn_dgram_socket_sendto_nowait(s, buf, size, port);
+}
+
int wrap_fn_dgram_socket_recvfrom(hcsoftbus lib, void *s, void **buf, int *size, int *port){
if (!fn_dgram_socket_recvfrom){
fn_dgram_socket_recvfrom = (tfn_dgram_socket_recvfrom)dlsym(lib, l_dgram_socket_recvfrom);
@@ -316,12 +332,45 @@
return fn_dgram_socket_recvfrom(s, buf, size, port);
}
+int wrap_fn_dgram_socket_recvfrom_timeout(hcsoftbus lib, void *s, void **buf, int *size, int *port, int sec, int usec){
+ if (!fn_dgram_socket_recvfrom_timeout){
+ fn_dgram_socket_recvfrom_timeout = (tfn_dgram_socket_recvfrom_timeout)dlsym(lib, l_dgram_socket_recvfrom_timeout);
+ check_with_ret(fn_dgram_socket_recvfrom_timeout, lib, -1);
+ }
+ return fn_dgram_socket_recvfrom_timeout(s, buf, size, port, sec, usec);
+}
+
+int wrap_fn_dgram_socket_recvfrom_nowait(hcsoftbus lib, void *s, void **buf, int *size, int *port){
+ if (!fn_dgram_socket_recvfrom_nowait){
+ fn_dgram_socket_recvfrom_nowait = (tfn_dgram_socket_recvfrom_nowait)dlsym(lib, l_dgram_socket_recvfrom_nowait);
+ check_with_ret(fn_dgram_socket_recvfrom_nowait, lib, -1);
+ }
+ return fn_dgram_socket_recvfrom_nowait(s, buf, size, port);
+}
+
int wrap_fn_dgram_socket_sendandrecv(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize){
if (!fn_dgram_socket_sendandrecv){
fn_dgram_socket_sendandrecv = (tfn_dgram_socket_sendandrecv)dlsym(lib, l_dgram_socket_sendandrecv);
check_with_ret(fn_dgram_socket_sendandrecv, lib, -1);
}
return fn_dgram_socket_sendandrecv(s, sbuf, ssize, port, rbuf, rsize);
+}
+
+int wrap_fn_dgram_socket_sendandrecv_timeout(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize, int sec, int usec){
+ if (!fn_dgram_socket_sendandrecv_timeout){
+ fn_dgram_socket_sendandrecv_timeout = (tfn_dgram_socket_sendandrecv_timeout)dlsym(lib, l_dgram_socket_sendandrecv_timeout);
+ check_with_ret(fn_dgram_socket_sendandrecv_timeout, lib, -1);
+ }
+
+ return fn_dgram_socket_sendandrecv_timeout(s, sbuf, ssize, port, rbuf, rsize, sec, usec);
+}
+
+int wrap_fn_dgram_socket_sendandrecv_nowait(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize){
+ if (!fn_dgram_socket_sendandrecv_nowait){
+ fn_dgram_socket_sendandrecv_nowait = (tfn_dgram_socket_sendandrecv_nowait)dlsym(lib, l_dgram_socket_sendandrecv_nowait);
+ check_with_ret(fn_dgram_socket_sendandrecv_nowait, lib, -1);
+ }
+ return fn_dgram_socket_sendandrecv_nowait(s, sbuf, ssize, port, rbuf, rsize);
}
int wrap_fn_dgram_socket_start_bus(hcsoftbus lib, void *s){
@@ -340,6 +389,22 @@
return fn_dgram_socket_sub(s, topic, size, port);
}
+int wrap_fn_dgram_socket_sub_timeout(hcsoftbus lib, void *s, void *topic, int size, int port, int sec, int usec){
+ if (!fn_dgram_socket_sub_timeout){
+ fn_dgram_socket_sub_timeout = (tfn_dgram_socket_sub_timeout)dlsym(lib, l_dgram_socket_sub_timeout);
+ check_with_ret(fn_dgram_socket_sub_timeout, lib, -1);
+ }
+ return fn_dgram_socket_sub_timeout(s, topic, size, port, sec, usec);
+}
+
+int wrap_fn_dgram_socket_sub_nowait(hcsoftbus lib, void *s, void *topic, int size, int port){
+ if (!fn_dgram_socket_sub_nowait){
+ fn_dgram_socket_sub_nowait = (tfn_dgram_socket_sub_nowait)dlsym(lib, l_dgram_socket_sub_nowait);
+ check_with_ret(fn_dgram_socket_sub_nowait, lib, -1);
+ }
+ return fn_dgram_socket_sub_nowait(s, topic, size, port);
+}
+
int wrap_fn_dgram_socket_pub(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port){
if (!fn_dgram_socket_pub){
fn_dgram_socket_pub = (tfn_dgram_socket_pub)dlsym(lib, l_dgram_socket_pub);
@@ -348,6 +413,22 @@
return fn_dgram_socket_pub(s, topic, tsize, content, csize, port);
}
+int wrap_fn_dgram_socket_pub_timeout(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port, int sec, int usec){
+ if (!fn_dgram_socket_pub_timeout){
+ fn_dgram_socket_pub_timeout = (tfn_dgram_socket_pub_timeout)dlsym(lib, l_dgram_socket_pub_timeout);
+ check_with_ret(fn_dgram_socket_pub_timeout, lib, -1);
+ }
+ return fn_dgram_socket_pub_timeout(s, topic, tsize, content, csize, port, sec, usec);
+}
+
+int wrap_fn_dgram_socket_pub_nowait(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port){
+ if (!fn_dgram_socket_pub_nowait){
+ fn_dgram_socket_pub_nowait = (tfn_dgram_socket_pub_nowait)dlsym(lib, l_dgram_socket_pub_nowait);
+ check_with_ret(fn_dgram_socket_pub_nowait, lib, -1);
+ }
+ return fn_dgram_socket_pub_nowait(s, topic, tsize, content, csize, port);
+}
+
int wrap_fn_dgram_socket_port(hcsoftbus lib, void *s){
if (!fn_dgram_socket_port){
fn_dgram_socket_port = (tfn_dgram_socket_port)dlsym(lib, l_dgram_socket_port);
diff --git a/libcsoftbus.h b/libcsoftbus.h
index 1885b61..f141f7a 100644
--- a/libcsoftbus.h
+++ b/libcsoftbus.h
@@ -52,11 +52,21 @@
static tfn_dgram_socket_bind fn_dgram_socket_bind = NULL;
static tfn_dgram_socket_force_bind fn_dgram_socket_force_bind = NULL;
static tfn_dgram_socket_sendto fn_dgram_socket_sendto = NULL;
+static tfn_dgram_socket_sendto_timeout fn_dgram_socket_sendto_timeout = NULL;
+static tfn_dgram_socket_sendto_nowait fn_dgram_socket_sendto_nowait = NULL;
static tfn_dgram_socket_recvfrom fn_dgram_socket_recvfrom = NULL;
+static tfn_dgram_socket_recvfrom_timeout fn_dgram_socket_recvfrom_timeout = NULL;
+static tfn_dgram_socket_recvfrom_nowait fn_dgram_socket_recvfrom_nowait = NULL;
static tfn_dgram_socket_sendandrecv fn_dgram_socket_sendandrecv = NULL;
+static tfn_dgram_socket_sendandrecv_timeout fn_dgram_socket_sendandrecv_timeout = NULL;
+static tfn_dgram_socket_sendandrecv_nowait fn_dgram_socket_sendandrecv_nowait = NULL;
static tfn_dgram_socket_start_bus fn_dgram_socket_start_bus = NULL;
static tfn_dgram_socket_sub fn_dgram_socket_sub = NULL;
+static tfn_dgram_socket_sub_timeout fn_dgram_socket_sub_timeout = NULL;
+static tfn_dgram_socket_sub_nowait fn_dgram_socket_sub_nowait = NULL;
static tfn_dgram_socket_pub fn_dgram_socket_pub = NULL;
+static tfn_dgram_socket_pub_timeout fn_dgram_socket_pub_timeout = NULL;
+static tfn_dgram_socket_pub_nowait fn_dgram_socket_pub_nowait = NULL;
static tfn_dgram_socket_port fn_dgram_socket_port = NULL;
static tfn_dgram_socket_free fn_dgram_socket_free = NULL;
@@ -107,11 +117,21 @@
const static char l_dgram_socket_bind[] = "dgram_mod_bind";
const static char l_dgram_socket_force_bind[] = "dgram_mod_force_bind";
const static char l_dgram_socket_sendto[] = "dgram_mod_sendto";
+const static char l_dgram_socket_sendto_timeout[] = "dgram_mod_sendto_timeout";
+const static char l_dgram_socket_sendto_nowait[] = "dgram_mod_sendto_nowait";
const static char l_dgram_socket_recvfrom[] = "dgram_mod_recvfrom";
+const static char l_dgram_socket_recvfrom_timeout[] = "dgram_mod_recvfrom_timeout";
+const static char l_dgram_socket_recvfrom_nowait[] = "dgram_mod_recvfrom_nowait";
const static char l_dgram_socket_sendandrecv[] = "dgram_mod_sendandrecv";
+const static char l_dgram_socket_sendandrecv_timeout[] = "dgram_mod_sendandrecv_timeout";
+const static char l_dgram_socket_sendandrecv_nowait[] = "dgram_mod_sendandrecv_nowait";
const static char l_dgram_socket_start_bus[] = "dgram_mod_start_bus";
const static char l_dgram_socket_sub[] = "dgram_mod_sub";
+const static char l_dgram_socket_sub_timeout[] = "dgram_mod_sub_timeout";
+const static char l_dgram_socket_sub_nowait[] = "dgram_mod_sub_nowait";
const static char l_dgram_socket_pub[] = "dgram_mod_pub";
+const static char l_dgram_socket_pub_timeout[] = "dgram_mod_pub_timeout";
+const static char l_dgram_socket_pub_nowait[] = "dgram_mod_pub_nowait";
const static char l_dgram_socket_port[] = "dgram_mod_get_port";
const static char l_dgram_socket_free[] = "dgram_mod_free";
@@ -168,11 +188,21 @@
int wrap_fn_dgram_socket_bind(hcsoftbus lib, void *s, int port);
int wrap_fn_dgram_socket_force_bind(hcsoftbus lib, void *s, int port);
int wrap_fn_dgram_socket_sendto(hcsoftbus lib, void *s, const void *buf, const int size, const int port);
+int wrap_fn_dgram_socket_sendto_timeout(hcsoftbus lib, void *s, const void *buf, const int size, const int port, int sec, int usec);
+int wrap_fn_dgram_socket_sendto_nowait(hcsoftbus lib, void *s, const void *buf, const int size, const int port);
int wrap_fn_dgram_socket_recvfrom(hcsoftbus lib, void *s, void **buf, int *size, int *port);
+int wrap_fn_dgram_socket_recvfrom_timeout(hcsoftbus lib, void *s, void **buf, int *size, int *port, int sec, int usec);
+int wrap_fn_dgram_socket_recvfrom_nowait(hcsoftbus lib, void *s, void **buf, int *size, int *port);
int wrap_fn_dgram_socket_sendandrecv(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize);
+int wrap_fn_dgram_socket_sendandrecv_timeout(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize, int sec, int usec);
+int wrap_fn_dgram_socket_sendandrecv_nowait(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize);
int wrap_fn_dgram_socket_start_bus(hcsoftbus lib, void *s);
int wrap_fn_dgram_socket_sub(hcsoftbus lib, void *s, void *topic, int size, int port);
+int wrap_fn_dgram_socket_sub_timeout(hcsoftbus lib, void *s, void *topic, int size, int port, int sec, int usec);
+int wrap_fn_dgram_socket_sub_nowait(hcsoftbus lib, void *s, void *topic, int size, int port);
int wrap_fn_dgram_socket_pub(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port);
+int wrap_fn_dgram_socket_pub_timeout(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port, int sec, int usec);
+int wrap_fn_dgram_socket_pub_nowait(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port);
int wrap_fn_dgram_socket_port(hcsoftbus lib, void *s);
void wrap_fn_dgram_socket_free(hcsoftbus lib, void *buf);
diff --git a/libcsoftbus_func.h b/libcsoftbus_func.h
index d921639..6ef5dc7 100644
--- a/libcsoftbus_func.h
+++ b/libcsoftbus_func.h
@@ -175,18 +175,26 @@
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
typedef int(*tfn_dgram_socket_sendto) (void*, const void*, const int, const int);
+// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
+typedef int(*tfn_dgram_socket_sendto_timeout) (void*, const void*, const int, const int, int, int);
+// 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
+typedef tfn_dgram_socket_sendto tfn_dgram_socket_sendto_nowait;
/**
* 鎺ユ敹淇℃伅
* @port 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
typedef int(*tfn_dgram_socket_recvfrom) (void*, void**, int*, int*);
+typedef int(*tfn_dgram_socket_recvfrom_timeout) (void*, void**, int*, int*, int, int);
+typedef tfn_dgram_socket_recvfrom tfn_dgram_socket_recvfrom_nowait;
/**
* 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟
* @port 鍙戦�佺粰璋�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
typedef int(*tfn_dgram_socket_sendandrecv) (void*, const void*, const int, const int, void**, int*);
+typedef int(*tfn_dgram_socket_sendandrecv_timeout) (void*, const void*, const int, const int, void**, int*, int, int);
+typedef tfn_dgram_socket_sendandrecv tfn_dgram_socket_sendandrecv_nowait;
/**
* 鍚姩bus
*
@@ -200,6 +208,8 @@
* @port 鎬荤嚎绔彛
*/
typedef int(*tfn_dgram_socket_sub) (void*, void*, int, int);
+typedef int(*tfn_dgram_socket_sub_timeout) (void*, void*, int, int, int, int);
+typedef tfn_dgram_socket_sub tfn_dgram_socket_sub_nowait;
/**
* 鍙戝竷涓婚
* @topic 涓婚
@@ -207,6 +217,8 @@
* @port 鎬荤嚎绔彛
*/
typedef int(*tfn_dgram_socket_pub) (void*, void*, int, void*, int, int);
+typedef int(*tfn_dgram_socket_pub_timeout) (void*, void*, int, void*, int, int, int, int);
+typedef tfn_dgram_socket_pub tfn_dgram_socket_pub_nowait;
/**
* 鑾峰彇soket绔彛鍙�
*/
diff --git a/library.go b/library.go
index 0d97e0f..491d63d 100644
--- a/library.go
+++ b/library.go
@@ -85,14 +85,16 @@
case <-ctx.Done():
return
default:
- if data, peer, err := sock.RecvFrom(); err == nil {
- var info *MsgInfo
- if err := proto.Unmarshal(data, info); err == nil {
+ if data, peer, err := sock.RecvFromTimeout(0, 10*1000); err == nil {
+ var info MsgInfo
+ if err := proto.Unmarshal(data, &info); err == nil {
ch <- TransInfo{
- info: info,
+ info: &info,
port: peer,
}
}
+ } else {
+ // time.Sleep(10 * time.Millisecond)
}
}
}
@@ -211,6 +213,11 @@
return handle
}
+const (
+ timeoutSec = 1
+ timeoutUsec = 0
+)
+
// GetTopicInfo get topic info
func (h *Handle) GetTopicInfo(topic, typ string) int {
// 鎹涓嶆洿鏂�,鍏堢敤缂撳瓨,鍚﹀垯闇�瑕佹柊鍒涘缓涓�涓猻ocket,鏉ヤ粠manager璇锋眰key
@@ -225,10 +232,10 @@
}
if data, err := proto.Marshal(msg); err == nil {
h.mtxWorker.Lock()
- if rdata, err := h.sockWorker.sock.SendAndRecv(data, h.sockWorker.peer); err == nil {
+ if rdata, err := h.sockWorker.sock.SendAndRecvTimeout(data, h.sockWorker.peer, timeoutSec, timeoutUsec); err == nil {
h.mtxWorker.Unlock()
- var rmsg *TopicInfoReply
- if err := proto.Unmarshal(rdata, rmsg); err == nil {
+ var rmsg TopicInfoReply
+ if err := proto.Unmarshal(rdata, &rmsg); err == nil {
return int(rmsg.Key)
}
}
@@ -238,7 +245,7 @@
}
func (h *Handle) send2(sc *sockClient, data []byte, logID string) error {
- if r := sc.sock.SendTo(data, sc.peer); r != 0 {
+ if r := sc.sock.SendToTimeout(data, sc.peer, timeoutSec, timeoutUsec); r != 0 {
return fmt.Errorf("%s SendTo Failed: %d", logID, r)
}
return nil
@@ -259,7 +266,7 @@
defer h.mtxWorker.Unlock()
msg, err := proto.Marshal(info)
if err == nil {
- if r := h.sockWorker.sock.SendTo(msg, key); r != 0 {
+ if r := h.sockWorker.sock.SendToTimeout(msg, key, timeoutSec, timeoutUsec); r != 0 {
return fmt.Errorf("SendOnly Failed: %d", r)
}
}
@@ -286,28 +293,28 @@
}
// 鍚屾鎺ュ彛,闇�瑕佺瓑寰呰繑鍥炲��
- var ret *MsgInfo
+ var ret MsgInfo
loop:
for {
select {
case <-h.ctx.Done():
return nil
default:
- if data, err := h.sockWorker.sock.SendAndRecv(msg, key); err == nil {
- if err := proto.Unmarshal(data, ret); err == nil {
+ if data, err := h.sockWorker.sock.SendAndRecvTimeout(msg, key, timeoutSec, timeoutUsec); err == nil {
+ if err := proto.Unmarshal(data, &ret); err == nil {
break loop
}
}
}
}
- return ret
+ return &ret
}
// Reply request
func (h *Handle) Reply(key int, info *MsgInfo) error {
msg, err := proto.Marshal(info)
if err == nil {
- if r := h.sockRep.sock.SendTo(msg, key); r != 0 {
+ if r := h.sockRep.sock.SendToTimeout(msg, key, timeoutSec, timeoutUsec); r != 0 {
return fmt.Errorf("Reply Failed: %d", r)
}
}
diff --git a/softbusDgram.go b/softbusDgram.go
index f390075..68d0a29 100644
--- a/softbusDgram.go
+++ b/softbusDgram.go
@@ -71,6 +71,26 @@
return int(r)
}
+// SendToTimeout port
+func (d *DgramSocket) SendToTimeout(data []byte, port int, sec, usec int) int {
+ if libsoftbus == nil {
+ return RETVAL
+ }
+
+ r := C.wrap_fn_dgram_socket_sendto_timeout(libsoftbus, d.dgram, unsafe.Pointer(&data[0]), C.int(len(data)), C.int(port), C.int(sec), C.int(usec))
+ return int(r)
+}
+
+// SendToNoWait port
+func (d *DgramSocket) SendToNoWait(data []byte, port int) int {
+ if libsoftbus == nil {
+ return RETVAL
+ }
+
+ r := C.wrap_fn_dgram_socket_sendto_nowait(libsoftbus, d.dgram, unsafe.Pointer(&data[0]), C.int(len(data)), C.int(port))
+ return int(r)
+}
+
// RecvFrom data and port
func (d *DgramSocket) RecvFrom() ([]byte, int, error) {
if libsoftbus == nil {
@@ -92,6 +112,48 @@
return data, int(rp), nil
}
+// RecvFromTimeout data and port
+func (d *DgramSocket) RecvFromTimeout(sec, usec int) ([]byte, int, error) {
+ if libsoftbus == nil {
+ return nil, -1, fmt.Errorf("RecvFromTimeout Func Test libsoftbus Is Nil")
+ }
+
+ var rb unsafe.Pointer
+ var rs C.int
+ var rp C.int
+
+ r := C.wrap_fn_dgram_socket_recvfrom_timeout(libsoftbus, d.dgram, &rb, &rs, &rp, C.int(sec), C.int(usec))
+ if r != 0 {
+ return nil, int(rp), fmt.Errorf("RecvFromTimeout Func Failed %d", int(r))
+ }
+
+ data := C.GoBytes(rb, rs)
+ C.wrap_fn_dgram_socket_free(libsoftbus, rb)
+
+ return data, int(rp), nil
+}
+
+// RecvFromNoWait data and port
+func (d *DgramSocket) RecvFromNoWait() ([]byte, int, error) {
+ if libsoftbus == nil {
+ return nil, -1, fmt.Errorf("RecvFromNoWait Func Test libsoftbus Is Nil")
+ }
+
+ var rb unsafe.Pointer
+ var rs C.int
+ var rp C.int
+
+ r := C.wrap_fn_dgram_socket_recvfrom_nowait(libsoftbus, d.dgram, &rb, &rs, &rp)
+ if r != 0 {
+ return nil, int(rp), fmt.Errorf("RecvFromNoWait Func Failed %d", int(r))
+ }
+
+ data := C.GoBytes(rb, rs)
+ C.wrap_fn_dgram_socket_free(libsoftbus, rb)
+
+ return data, int(rp), nil
+}
+
// SendAndRecv sync
func (d *DgramSocket) SendAndRecv(sdata []byte, port int) ([]byte, error) {
if libsoftbus == nil {
@@ -104,6 +166,46 @@
r := C.wrap_fn_dgram_socket_sendandrecv(libsoftbus, d.dgram, unsafe.Pointer(&sdata[0]), C.int(len(sdata)), C.int(port), &rb, &rs)
if r != 0 {
return nil, fmt.Errorf("SendAndRecv Send To %d And Recv From It Failed", port)
+ }
+
+ data := C.GoBytes(rb, rs)
+ C.wrap_fn_dgram_socket_free(libsoftbus, rb)
+
+ return data, nil
+}
+
+// SendAndRecvTimeout sync
+func (d *DgramSocket) SendAndRecvTimeout(sdata []byte, port int, sec, usec int) ([]byte, error) {
+ if libsoftbus == nil {
+ return nil, fmt.Errorf("SendAndRecvTimeout Func Test libsoftbus Is Nil")
+ }
+
+ var rb unsafe.Pointer
+ var rs C.int
+
+ r := C.wrap_fn_dgram_socket_sendandrecv_timeout(libsoftbus, d.dgram, unsafe.Pointer(&sdata[0]), C.int(len(sdata)), C.int(port), &rb, &rs, C.int(sec), C.int(usec))
+ if r != 0 {
+ return nil, fmt.Errorf("SendAndRecvTimeout Send To %d And Recv From It Failed", port)
+ }
+
+ data := C.GoBytes(rb, rs)
+ C.wrap_fn_dgram_socket_free(libsoftbus, rb)
+
+ return data, nil
+}
+
+// SendAndRecvNoWait sync
+func (d *DgramSocket) SendAndRecvNoWait(sdata []byte, port int) ([]byte, error) {
+ if libsoftbus == nil {
+ return nil, fmt.Errorf("SendAndRecvNoWait Func Test libsoftbus Is Nil")
+ }
+
+ var rb unsafe.Pointer
+ var rs C.int
+
+ r := C.wrap_fn_dgram_socket_sendandrecv_nowait(libsoftbus, d.dgram, unsafe.Pointer(&sdata[0]), C.int(len(sdata)), C.int(port), &rb, &rs)
+ if r != 0 {
+ return nil, fmt.Errorf("SendAndRecvNoWait Send To %d And Recv From It Failed", port)
}
data := C.GoBytes(rb, rs)
@@ -135,6 +237,32 @@
return int(r)
}
+// SubTimeout sub bus
+func (d *DgramSocket) SubTimeout(topic string, port int, sec, usec int) int {
+ if libsoftbus == nil {
+ return RETVAL
+ }
+
+ ct := C.CString(topic)
+ defer C.free(unsafe.Pointer(ct))
+
+ r := C.wrap_fn_dgram_socket_sub_timeout(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), C.int(port), C.int(sec), C.int(usec))
+ return int(r)
+}
+
+// SubNoWait sub bus
+func (d *DgramSocket) SubNoWait(topic string, port int) int {
+ if libsoftbus == nil {
+ return RETVAL
+ }
+
+ ct := C.CString(topic)
+ defer C.free(unsafe.Pointer(ct))
+
+ r := C.wrap_fn_dgram_socket_sub_nowait(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), C.int(port))
+ return int(r)
+}
+
// Pub bus
func (d *DgramSocket) Pub(topic, msg string, port int) int {
if libsoftbus == nil {
@@ -150,6 +278,36 @@
return int(r)
}
+// PubTimeout bus
+func (d *DgramSocket) PubTimeout(topic, msg string, port int, sec, usec int) int {
+ if libsoftbus == nil {
+ return RETVAL
+ }
+
+ ct := C.CString(topic)
+ defer C.free(unsafe.Pointer(ct))
+ cm := C.CString(msg)
+ defer C.free(unsafe.Pointer(cm))
+
+ r := C.wrap_fn_dgram_socket_pub_timeout(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), unsafe.Pointer(cm), C.int(len(msg)), C.int(port), C.int(sec), C.int(usec))
+ return int(r)
+}
+
+// PubNoWait bus
+func (d *DgramSocket) PubNoWait(topic, msg string, port int) int {
+ if libsoftbus == nil {
+ return RETVAL
+ }
+
+ ct := C.CString(topic)
+ defer C.free(unsafe.Pointer(ct))
+ cm := C.CString(msg)
+ defer C.free(unsafe.Pointer(cm))
+
+ r := C.wrap_fn_dgram_socket_pub_nowait(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), unsafe.Pointer(cm), C.int(len(msg)), C.int(port))
+ return int(r)
+}
+
// Port socket
func (d *DgramSocket) Port() int {
if libsoftbus == nil {
--
Gitblit v1.8.0