From 0bde715af72b7b3d55ad3aac816d7cd153a60b42 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 03 八月 2020 13:29:21 +0800
Subject: [PATCH] add timeout api

---
 library.go         |   35 +++--
 libcsoftbus.h      |   30 +++++
 libcsoftbus_func.h |   12 ++
 softbusDgram.go    |  158 ++++++++++++++++++++++++++
 libcsoftbus.c      |   81 +++++++++++++
 5 files changed, 302 insertions(+), 14 deletions(-)

diff --git a/libcsoftbus.c b/libcsoftbus.c
index a5a23c1..11fc582 100644
--- a/libcsoftbus.c
+++ b/libcsoftbus.c
@@ -308,6 +308,22 @@
     return fn_dgram_socket_sendto(s, buf, size, port);
 }
 
+int wrap_fn_dgram_socket_sendto_timeout(hcsoftbus lib, void *s, const void *buf, const int size, const int port, int sec, int usec){
+    if (!fn_dgram_socket_sendto_timeout){
+        fn_dgram_socket_sendto_timeout = (tfn_dgram_socket_sendto_timeout)dlsym(lib, l_dgram_socket_sendto_timeout);
+        check_with_ret(fn_dgram_socket_sendto_timeout, lib, -1);
+    }
+    return fn_dgram_socket_sendto_timeout(s, buf, size, port, sec, usec);
+}
+
+int wrap_fn_dgram_socket_sendto_nowait(hcsoftbus lib, void *s, const void *buf, const int size, const int port){
+    if (!fn_dgram_socket_sendto_nowait){
+        fn_dgram_socket_sendto_nowait = (tfn_dgram_socket_sendto_nowait)dlsym(lib, l_dgram_socket_sendto_nowait);
+        check_with_ret(fn_dgram_socket_sendto_nowait, lib, -1);
+    }
+    return fn_dgram_socket_sendto_nowait(s, buf, size, port);
+}
+
 int wrap_fn_dgram_socket_recvfrom(hcsoftbus lib, void *s, void **buf, int *size, int *port){
     if (!fn_dgram_socket_recvfrom){
         fn_dgram_socket_recvfrom = (tfn_dgram_socket_recvfrom)dlsym(lib, l_dgram_socket_recvfrom);
@@ -316,12 +332,45 @@
     return fn_dgram_socket_recvfrom(s, buf, size, port);
 }
 
+int wrap_fn_dgram_socket_recvfrom_timeout(hcsoftbus lib, void *s, void **buf, int *size, int *port, int sec, int usec){
+    if (!fn_dgram_socket_recvfrom_timeout){
+        fn_dgram_socket_recvfrom_timeout = (tfn_dgram_socket_recvfrom_timeout)dlsym(lib, l_dgram_socket_recvfrom_timeout);
+        check_with_ret(fn_dgram_socket_recvfrom_timeout, lib, -1);
+    }
+    return fn_dgram_socket_recvfrom_timeout(s, buf, size, port, sec, usec);
+}
+
+int wrap_fn_dgram_socket_recvfrom_nowait(hcsoftbus lib, void *s, void **buf, int *size, int *port){
+    if (!fn_dgram_socket_recvfrom_nowait){
+        fn_dgram_socket_recvfrom_nowait = (tfn_dgram_socket_recvfrom_nowait)dlsym(lib, l_dgram_socket_recvfrom_nowait);
+        check_with_ret(fn_dgram_socket_recvfrom_nowait, lib, -1);
+    }
+    return fn_dgram_socket_recvfrom_nowait(s, buf, size, port);
+}
+
 int wrap_fn_dgram_socket_sendandrecv(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize){
     if (!fn_dgram_socket_sendandrecv){
         fn_dgram_socket_sendandrecv = (tfn_dgram_socket_sendandrecv)dlsym(lib, l_dgram_socket_sendandrecv);
         check_with_ret(fn_dgram_socket_sendandrecv, lib, -1);
     }
     return fn_dgram_socket_sendandrecv(s, sbuf, ssize, port, rbuf, rsize);
+}
+
+int wrap_fn_dgram_socket_sendandrecv_timeout(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize, int sec, int usec){
+    if (!fn_dgram_socket_sendandrecv_timeout){
+        fn_dgram_socket_sendandrecv_timeout = (tfn_dgram_socket_sendandrecv_timeout)dlsym(lib, l_dgram_socket_sendandrecv_timeout);
+        check_with_ret(fn_dgram_socket_sendandrecv_timeout, lib, -1);
+    }
+
+    return fn_dgram_socket_sendandrecv_timeout(s, sbuf, ssize, port, rbuf, rsize, sec, usec);
+}
+
+int wrap_fn_dgram_socket_sendandrecv_nowait(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize){
+    if (!fn_dgram_socket_sendandrecv_nowait){
+        fn_dgram_socket_sendandrecv_nowait = (tfn_dgram_socket_sendandrecv_nowait)dlsym(lib, l_dgram_socket_sendandrecv_nowait);
+        check_with_ret(fn_dgram_socket_sendandrecv_nowait, lib, -1);
+    }
+    return fn_dgram_socket_sendandrecv_nowait(s, sbuf, ssize, port, rbuf, rsize);
 }
 
 int wrap_fn_dgram_socket_start_bus(hcsoftbus lib, void *s){
@@ -340,6 +389,22 @@
     return fn_dgram_socket_sub(s, topic, size, port);
 }
 
+int wrap_fn_dgram_socket_sub_timeout(hcsoftbus lib, void *s, void *topic, int size, int port, int sec, int usec){
+    if (!fn_dgram_socket_sub_timeout){
+        fn_dgram_socket_sub_timeout = (tfn_dgram_socket_sub_timeout)dlsym(lib, l_dgram_socket_sub_timeout);
+        check_with_ret(fn_dgram_socket_sub_timeout, lib, -1);
+    }
+    return fn_dgram_socket_sub_timeout(s, topic, size, port, sec, usec);
+}
+
+int wrap_fn_dgram_socket_sub_nowait(hcsoftbus lib, void *s, void *topic, int size, int port){
+    if (!fn_dgram_socket_sub_nowait){
+        fn_dgram_socket_sub_nowait = (tfn_dgram_socket_sub_nowait)dlsym(lib, l_dgram_socket_sub_nowait);
+        check_with_ret(fn_dgram_socket_sub_nowait, lib, -1);
+    }
+    return fn_dgram_socket_sub_nowait(s, topic, size, port);
+}
+
 int wrap_fn_dgram_socket_pub(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port){
     if (!fn_dgram_socket_pub){
         fn_dgram_socket_pub = (tfn_dgram_socket_pub)dlsym(lib, l_dgram_socket_pub);
@@ -348,6 +413,22 @@
     return fn_dgram_socket_pub(s, topic, tsize, content, csize, port);
 }
 
+int wrap_fn_dgram_socket_pub_timeout(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port, int sec, int usec){
+    if (!fn_dgram_socket_pub_timeout){
+        fn_dgram_socket_pub_timeout = (tfn_dgram_socket_pub_timeout)dlsym(lib, l_dgram_socket_pub_timeout);
+        check_with_ret(fn_dgram_socket_pub_timeout, lib, -1);
+    }
+    return fn_dgram_socket_pub_timeout(s, topic, tsize, content, csize, port, sec, usec);
+}
+
+int wrap_fn_dgram_socket_pub_nowait(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port){
+    if (!fn_dgram_socket_pub_nowait){
+        fn_dgram_socket_pub_nowait = (tfn_dgram_socket_pub_nowait)dlsym(lib, l_dgram_socket_pub_nowait);
+        check_with_ret(fn_dgram_socket_pub_nowait, lib, -1);
+    }
+    return fn_dgram_socket_pub_nowait(s, topic, tsize, content, csize, port);
+}
+
 int wrap_fn_dgram_socket_port(hcsoftbus lib, void *s){
     if (!fn_dgram_socket_port){
         fn_dgram_socket_port = (tfn_dgram_socket_port)dlsym(lib, l_dgram_socket_port);
diff --git a/libcsoftbus.h b/libcsoftbus.h
index 1885b61..f141f7a 100644
--- a/libcsoftbus.h
+++ b/libcsoftbus.h
@@ -52,11 +52,21 @@
 static tfn_dgram_socket_bind    fn_dgram_socket_bind = NULL;
 static tfn_dgram_socket_force_bind    fn_dgram_socket_force_bind = NULL;
 static tfn_dgram_socket_sendto  fn_dgram_socket_sendto = NULL;
+static tfn_dgram_socket_sendto_timeout  fn_dgram_socket_sendto_timeout = NULL;
+static tfn_dgram_socket_sendto_nowait  fn_dgram_socket_sendto_nowait = NULL;
 static tfn_dgram_socket_recvfrom      fn_dgram_socket_recvfrom = NULL;
+static tfn_dgram_socket_recvfrom_timeout      fn_dgram_socket_recvfrom_timeout = NULL;
+static tfn_dgram_socket_recvfrom_nowait      fn_dgram_socket_recvfrom_nowait = NULL;
 static tfn_dgram_socket_sendandrecv   fn_dgram_socket_sendandrecv = NULL;
+static tfn_dgram_socket_sendandrecv_timeout   fn_dgram_socket_sendandrecv_timeout = NULL;
+static tfn_dgram_socket_sendandrecv_nowait   fn_dgram_socket_sendandrecv_nowait = NULL;
 static tfn_dgram_socket_start_bus     fn_dgram_socket_start_bus = NULL;
 static tfn_dgram_socket_sub     fn_dgram_socket_sub = NULL;
+static tfn_dgram_socket_sub_timeout     fn_dgram_socket_sub_timeout = NULL;
+static tfn_dgram_socket_sub_nowait     fn_dgram_socket_sub_nowait = NULL;
 static tfn_dgram_socket_pub     fn_dgram_socket_pub = NULL;
+static tfn_dgram_socket_pub_timeout     fn_dgram_socket_pub_timeout = NULL;
+static tfn_dgram_socket_pub_nowait     fn_dgram_socket_pub_nowait = NULL;
 static tfn_dgram_socket_port    fn_dgram_socket_port = NULL;
 static tfn_dgram_socket_free    fn_dgram_socket_free = NULL;
 
@@ -107,11 +117,21 @@
 const static char l_dgram_socket_bind[] = "dgram_mod_bind";
 const static char l_dgram_socket_force_bind[] = "dgram_mod_force_bind";
 const static char l_dgram_socket_sendto[] = "dgram_mod_sendto";
+const static char l_dgram_socket_sendto_timeout[] = "dgram_mod_sendto_timeout";
+const static char l_dgram_socket_sendto_nowait[] = "dgram_mod_sendto_nowait";
 const static char l_dgram_socket_recvfrom[] = "dgram_mod_recvfrom";
+const static char l_dgram_socket_recvfrom_timeout[] = "dgram_mod_recvfrom_timeout";
+const static char l_dgram_socket_recvfrom_nowait[] = "dgram_mod_recvfrom_nowait";
 const static char l_dgram_socket_sendandrecv[] = "dgram_mod_sendandrecv";
+const static char l_dgram_socket_sendandrecv_timeout[] = "dgram_mod_sendandrecv_timeout";
+const static char l_dgram_socket_sendandrecv_nowait[] = "dgram_mod_sendandrecv_nowait";
 const static char l_dgram_socket_start_bus[] = "dgram_mod_start_bus";
 const static char l_dgram_socket_sub[] = "dgram_mod_sub";
+const static char l_dgram_socket_sub_timeout[] = "dgram_mod_sub_timeout";
+const static char l_dgram_socket_sub_nowait[] = "dgram_mod_sub_nowait";
 const static char l_dgram_socket_pub[] = "dgram_mod_pub";
+const static char l_dgram_socket_pub_timeout[] = "dgram_mod_pub_timeout";
+const static char l_dgram_socket_pub_nowait[] = "dgram_mod_pub_nowait";
 const static char l_dgram_socket_port[] = "dgram_mod_get_port";
 const static char l_dgram_socket_free[] = "dgram_mod_free";
 
@@ -168,11 +188,21 @@
 int wrap_fn_dgram_socket_bind(hcsoftbus lib, void *s, int port);
 int wrap_fn_dgram_socket_force_bind(hcsoftbus lib, void *s, int port);
 int wrap_fn_dgram_socket_sendto(hcsoftbus lib, void *s, const void *buf, const int size, const int port);
+int wrap_fn_dgram_socket_sendto_timeout(hcsoftbus lib, void *s, const void *buf, const int size, const int port, int sec, int usec);
+int wrap_fn_dgram_socket_sendto_nowait(hcsoftbus lib, void *s, const void *buf, const int size, const int port);
 int wrap_fn_dgram_socket_recvfrom(hcsoftbus lib, void *s, void **buf, int *size, int *port);
+int wrap_fn_dgram_socket_recvfrom_timeout(hcsoftbus lib, void *s, void **buf, int *size, int *port, int sec, int usec);
+int wrap_fn_dgram_socket_recvfrom_nowait(hcsoftbus lib, void *s, void **buf, int *size, int *port);
 int wrap_fn_dgram_socket_sendandrecv(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize);
+int wrap_fn_dgram_socket_sendandrecv_timeout(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize, int sec, int usec);
+int wrap_fn_dgram_socket_sendandrecv_nowait(hcsoftbus lib, void *s, const void *sbuf, const int ssize, const int port, void **rbuf, int *rsize);
 int wrap_fn_dgram_socket_start_bus(hcsoftbus lib, void *s);
 int wrap_fn_dgram_socket_sub(hcsoftbus lib, void *s, void *topic, int size, int port);
+int wrap_fn_dgram_socket_sub_timeout(hcsoftbus lib, void *s, void *topic, int size, int port, int sec, int usec);
+int wrap_fn_dgram_socket_sub_nowait(hcsoftbus lib, void *s, void *topic, int size, int port);
 int wrap_fn_dgram_socket_pub(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port);
+int wrap_fn_dgram_socket_pub_timeout(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port, int sec, int usec);
+int wrap_fn_dgram_socket_pub_nowait(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port);
 int wrap_fn_dgram_socket_port(hcsoftbus lib, void *s);
 void wrap_fn_dgram_socket_free(hcsoftbus lib, void *buf);
 
diff --git a/libcsoftbus_func.h b/libcsoftbus_func.h
index d921639..6ef5dc7 100644
--- a/libcsoftbus_func.h
+++ b/libcsoftbus_func.h
@@ -175,18 +175,26 @@
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
  */
 typedef int(*tfn_dgram_socket_sendto) (void*, const void*, const int, const int);
+// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
+typedef int(*tfn_dgram_socket_sendto_timeout) (void*, const void*, const int, const int, int, int);
+// 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
+typedef tfn_dgram_socket_sendto tfn_dgram_socket_sendto_nowait;
 /**
  * 鎺ユ敹淇℃伅
  * @port 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
 typedef int(*tfn_dgram_socket_recvfrom) (void*, void**, int*, int*);
+typedef int(*tfn_dgram_socket_recvfrom_timeout) (void*, void**, int*, int*, int, int);
+typedef tfn_dgram_socket_recvfrom tfn_dgram_socket_recvfrom_nowait;
 /**
  * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟
  * @port 鍙戦�佺粰璋�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
 typedef int(*tfn_dgram_socket_sendandrecv) (void*, const void*, const int, const int, void**, int*);
+typedef int(*tfn_dgram_socket_sendandrecv_timeout) (void*, const void*, const int, const int, void**, int*, int, int);
+typedef tfn_dgram_socket_sendandrecv tfn_dgram_socket_sendandrecv_nowait;
 /**
  * 鍚姩bus
  *
@@ -200,6 +208,8 @@
  * @port 鎬荤嚎绔彛
  */
 typedef int(*tfn_dgram_socket_sub) (void*, void*, int, int);
+typedef int(*tfn_dgram_socket_sub_timeout) (void*, void*, int, int, int, int);
+typedef tfn_dgram_socket_sub tfn_dgram_socket_sub_nowait;
 /**
  * 鍙戝竷涓婚
  * @topic 涓婚
@@ -207,6 +217,8 @@
  * @port 鎬荤嚎绔彛
  */
 typedef int(*tfn_dgram_socket_pub) (void*, void*, int, void*, int, int);
+typedef int(*tfn_dgram_socket_pub_timeout) (void*, void*, int, void*, int, int, int, int);
+typedef tfn_dgram_socket_pub tfn_dgram_socket_pub_nowait;
 /**
  * 鑾峰彇soket绔彛鍙�
  */
diff --git a/library.go b/library.go
index 0d97e0f..491d63d 100644
--- a/library.go
+++ b/library.go
@@ -85,14 +85,16 @@
 		case <-ctx.Done():
 			return
 		default:
-			if data, peer, err := sock.RecvFrom(); err == nil {
-				var info *MsgInfo
-				if err := proto.Unmarshal(data, info); err == nil {
+			if data, peer, err := sock.RecvFromTimeout(0, 10*1000); err == nil {
+				var info MsgInfo
+				if err := proto.Unmarshal(data, &info); err == nil {
 					ch <- TransInfo{
-						info: info,
+						info: &info,
 						port: peer,
 					}
 				}
+			} else {
+				// time.Sleep(10 * time.Millisecond)
 			}
 		}
 	}
@@ -211,6 +213,11 @@
 	return handle
 }
 
+const (
+	timeoutSec  = 1
+	timeoutUsec = 0
+)
+
 // GetTopicInfo get topic info
 func (h *Handle) GetTopicInfo(topic, typ string) int {
 	// 鎹涓嶆洿鏂�,鍏堢敤缂撳瓨,鍚﹀垯闇�瑕佹柊鍒涘缓涓�涓猻ocket,鏉ヤ粠manager璇锋眰key
@@ -225,10 +232,10 @@
 	}
 	if data, err := proto.Marshal(msg); err == nil {
 		h.mtxWorker.Lock()
-		if rdata, err := h.sockWorker.sock.SendAndRecv(data, h.sockWorker.peer); err == nil {
+		if rdata, err := h.sockWorker.sock.SendAndRecvTimeout(data, h.sockWorker.peer, timeoutSec, timeoutUsec); err == nil {
 			h.mtxWorker.Unlock()
-			var rmsg *TopicInfoReply
-			if err := proto.Unmarshal(rdata, rmsg); err == nil {
+			var rmsg TopicInfoReply
+			if err := proto.Unmarshal(rdata, &rmsg); err == nil {
 				return int(rmsg.Key)
 			}
 		}
@@ -238,7 +245,7 @@
 }
 
 func (h *Handle) send2(sc *sockClient, data []byte, logID string) error {
-	if r := sc.sock.SendTo(data, sc.peer); r != 0 {
+	if r := sc.sock.SendToTimeout(data, sc.peer, timeoutSec, timeoutUsec); r != 0 {
 		return fmt.Errorf("%s SendTo Failed: %d", logID, r)
 	}
 	return nil
@@ -259,7 +266,7 @@
 	defer h.mtxWorker.Unlock()
 	msg, err := proto.Marshal(info)
 	if err == nil {
-		if r := h.sockWorker.sock.SendTo(msg, key); r != 0 {
+		if r := h.sockWorker.sock.SendToTimeout(msg, key, timeoutSec, timeoutUsec); r != 0 {
 			return fmt.Errorf("SendOnly Failed: %d", r)
 		}
 	}
@@ -286,28 +293,28 @@
 	}
 
 	// 鍚屾鎺ュ彛,闇�瑕佺瓑寰呰繑鍥炲��
-	var ret *MsgInfo
+	var ret MsgInfo
 loop:
 	for {
 		select {
 		case <-h.ctx.Done():
 			return nil
 		default:
-			if data, err := h.sockWorker.sock.SendAndRecv(msg, key); err == nil {
-				if err := proto.Unmarshal(data, ret); err == nil {
+			if data, err := h.sockWorker.sock.SendAndRecvTimeout(msg, key, timeoutSec, timeoutUsec); err == nil {
+				if err := proto.Unmarshal(data, &ret); err == nil {
 					break loop
 				}
 			}
 		}
 	}
-	return ret
+	return &ret
 }
 
 // Reply request
 func (h *Handle) Reply(key int, info *MsgInfo) error {
 	msg, err := proto.Marshal(info)
 	if err == nil {
-		if r := h.sockRep.sock.SendTo(msg, key); r != 0 {
+		if r := h.sockRep.sock.SendToTimeout(msg, key, timeoutSec, timeoutUsec); r != 0 {
 			return fmt.Errorf("Reply Failed: %d", r)
 		}
 	}
diff --git a/softbusDgram.go b/softbusDgram.go
index f390075..68d0a29 100644
--- a/softbusDgram.go
+++ b/softbusDgram.go
@@ -71,6 +71,26 @@
 	return int(r)
 }
 
+// SendToTimeout port
+func (d *DgramSocket) SendToTimeout(data []byte, port int, sec, usec int) int {
+	if libsoftbus == nil {
+		return RETVAL
+	}
+
+	r := C.wrap_fn_dgram_socket_sendto_timeout(libsoftbus, d.dgram, unsafe.Pointer(&data[0]), C.int(len(data)), C.int(port), C.int(sec), C.int(usec))
+	return int(r)
+}
+
+// SendToNoWait port
+func (d *DgramSocket) SendToNoWait(data []byte, port int) int {
+	if libsoftbus == nil {
+		return RETVAL
+	}
+
+	r := C.wrap_fn_dgram_socket_sendto_nowait(libsoftbus, d.dgram, unsafe.Pointer(&data[0]), C.int(len(data)), C.int(port))
+	return int(r)
+}
+
 // RecvFrom data and port
 func (d *DgramSocket) RecvFrom() ([]byte, int, error) {
 	if libsoftbus == nil {
@@ -92,6 +112,48 @@
 	return data, int(rp), nil
 }
 
+// RecvFromTimeout data and port
+func (d *DgramSocket) RecvFromTimeout(sec, usec int) ([]byte, int, error) {
+	if libsoftbus == nil {
+		return nil, -1, fmt.Errorf("RecvFromTimeout Func Test libsoftbus Is Nil")
+	}
+
+	var rb unsafe.Pointer
+	var rs C.int
+	var rp C.int
+
+	r := C.wrap_fn_dgram_socket_recvfrom_timeout(libsoftbus, d.dgram, &rb, &rs, &rp, C.int(sec), C.int(usec))
+	if r != 0 {
+		return nil, int(rp), fmt.Errorf("RecvFromTimeout Func Failed %d", int(r))
+	}
+
+	data := C.GoBytes(rb, rs)
+	C.wrap_fn_dgram_socket_free(libsoftbus, rb)
+
+	return data, int(rp), nil
+}
+
+// RecvFromNoWait data and port
+func (d *DgramSocket) RecvFromNoWait() ([]byte, int, error) {
+	if libsoftbus == nil {
+		return nil, -1, fmt.Errorf("RecvFromNoWait Func Test libsoftbus Is Nil")
+	}
+
+	var rb unsafe.Pointer
+	var rs C.int
+	var rp C.int
+
+	r := C.wrap_fn_dgram_socket_recvfrom_nowait(libsoftbus, d.dgram, &rb, &rs, &rp)
+	if r != 0 {
+		return nil, int(rp), fmt.Errorf("RecvFromNoWait Func Failed %d", int(r))
+	}
+
+	data := C.GoBytes(rb, rs)
+	C.wrap_fn_dgram_socket_free(libsoftbus, rb)
+
+	return data, int(rp), nil
+}
+
 // SendAndRecv sync
 func (d *DgramSocket) SendAndRecv(sdata []byte, port int) ([]byte, error) {
 	if libsoftbus == nil {
@@ -104,6 +166,46 @@
 	r := C.wrap_fn_dgram_socket_sendandrecv(libsoftbus, d.dgram, unsafe.Pointer(&sdata[0]), C.int(len(sdata)), C.int(port), &rb, &rs)
 	if r != 0 {
 		return nil, fmt.Errorf("SendAndRecv Send To %d And Recv From It Failed", port)
+	}
+
+	data := C.GoBytes(rb, rs)
+	C.wrap_fn_dgram_socket_free(libsoftbus, rb)
+
+	return data, nil
+}
+
+// SendAndRecvTimeout sync
+func (d *DgramSocket) SendAndRecvTimeout(sdata []byte, port int, sec, usec int) ([]byte, error) {
+	if libsoftbus == nil {
+		return nil, fmt.Errorf("SendAndRecvTimeout Func Test libsoftbus Is Nil")
+	}
+
+	var rb unsafe.Pointer
+	var rs C.int
+
+	r := C.wrap_fn_dgram_socket_sendandrecv_timeout(libsoftbus, d.dgram, unsafe.Pointer(&sdata[0]), C.int(len(sdata)), C.int(port), &rb, &rs, C.int(sec), C.int(usec))
+	if r != 0 {
+		return nil, fmt.Errorf("SendAndRecvTimeout Send To %d And Recv From It Failed", port)
+	}
+
+	data := C.GoBytes(rb, rs)
+	C.wrap_fn_dgram_socket_free(libsoftbus, rb)
+
+	return data, nil
+}
+
+// SendAndRecvNoWait sync
+func (d *DgramSocket) SendAndRecvNoWait(sdata []byte, port int) ([]byte, error) {
+	if libsoftbus == nil {
+		return nil, fmt.Errorf("SendAndRecvNoWait Func Test libsoftbus Is Nil")
+	}
+
+	var rb unsafe.Pointer
+	var rs C.int
+
+	r := C.wrap_fn_dgram_socket_sendandrecv_nowait(libsoftbus, d.dgram, unsafe.Pointer(&sdata[0]), C.int(len(sdata)), C.int(port), &rb, &rs)
+	if r != 0 {
+		return nil, fmt.Errorf("SendAndRecvNoWait Send To %d And Recv From It Failed", port)
 	}
 
 	data := C.GoBytes(rb, rs)
@@ -135,6 +237,32 @@
 	return int(r)
 }
 
+// SubTimeout sub bus
+func (d *DgramSocket) SubTimeout(topic string, port int, sec, usec int) int {
+	if libsoftbus == nil {
+		return RETVAL
+	}
+
+	ct := C.CString(topic)
+	defer C.free(unsafe.Pointer(ct))
+
+	r := C.wrap_fn_dgram_socket_sub_timeout(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), C.int(port), C.int(sec), C.int(usec))
+	return int(r)
+}
+
+// SubNoWait sub bus
+func (d *DgramSocket) SubNoWait(topic string, port int) int {
+	if libsoftbus == nil {
+		return RETVAL
+	}
+
+	ct := C.CString(topic)
+	defer C.free(unsafe.Pointer(ct))
+
+	r := C.wrap_fn_dgram_socket_sub_nowait(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), C.int(port))
+	return int(r)
+}
+
 // Pub bus
 func (d *DgramSocket) Pub(topic, msg string, port int) int {
 	if libsoftbus == nil {
@@ -150,6 +278,36 @@
 	return int(r)
 }
 
+// PubTimeout bus
+func (d *DgramSocket) PubTimeout(topic, msg string, port int, sec, usec int) int {
+	if libsoftbus == nil {
+		return RETVAL
+	}
+
+	ct := C.CString(topic)
+	defer C.free(unsafe.Pointer(ct))
+	cm := C.CString(msg)
+	defer C.free(unsafe.Pointer(cm))
+
+	r := C.wrap_fn_dgram_socket_pub_timeout(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), unsafe.Pointer(cm), C.int(len(msg)), C.int(port), C.int(sec), C.int(usec))
+	return int(r)
+}
+
+// PubNoWait bus
+func (d *DgramSocket) PubNoWait(topic, msg string, port int) int {
+	if libsoftbus == nil {
+		return RETVAL
+	}
+
+	ct := C.CString(topic)
+	defer C.free(unsafe.Pointer(ct))
+	cm := C.CString(msg)
+	defer C.free(unsafe.Pointer(cm))
+
+	r := C.wrap_fn_dgram_socket_pub_nowait(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), unsafe.Pointer(cm), C.int(len(msg)), C.int(port))
+	return int(r)
+}
+
 // Port socket
 func (d *DgramSocket) Port() int {
 	if libsoftbus == nil {

--
Gitblit v1.8.0