From 0bde715af72b7b3d55ad3aac816d7cd153a60b42 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 03 八月 2020 13:29:21 +0800
Subject: [PATCH] add timeout api

---
 library.go |   35 +++++++++++++++++++++--------------
 1 files changed, 21 insertions(+), 14 deletions(-)

diff --git a/library.go b/library.go
index 0d97e0f..491d63d 100644
--- a/library.go
+++ b/library.go
@@ -85,14 +85,16 @@
 		case <-ctx.Done():
 			return
 		default:
-			if data, peer, err := sock.RecvFrom(); err == nil {
-				var info *MsgInfo
-				if err := proto.Unmarshal(data, info); err == nil {
+			if data, peer, err := sock.RecvFromTimeout(0, 10*1000); err == nil {
+				var info MsgInfo
+				if err := proto.Unmarshal(data, &info); err == nil {
 					ch <- TransInfo{
-						info: info,
+						info: &info,
 						port: peer,
 					}
 				}
+			} else {
+				// time.Sleep(10 * time.Millisecond)
 			}
 		}
 	}
@@ -211,6 +213,11 @@
 	return handle
 }
 
+const (
+	timeoutSec  = 1
+	timeoutUsec = 0
+)
+
 // GetTopicInfo get topic info
 func (h *Handle) GetTopicInfo(topic, typ string) int {
 	// 鎹涓嶆洿鏂�,鍏堢敤缂撳瓨,鍚﹀垯闇�瑕佹柊鍒涘缓涓�涓猻ocket,鏉ヤ粠manager璇锋眰key
@@ -225,10 +232,10 @@
 	}
 	if data, err := proto.Marshal(msg); err == nil {
 		h.mtxWorker.Lock()
-		if rdata, err := h.sockWorker.sock.SendAndRecv(data, h.sockWorker.peer); err == nil {
+		if rdata, err := h.sockWorker.sock.SendAndRecvTimeout(data, h.sockWorker.peer, timeoutSec, timeoutUsec); err == nil {
 			h.mtxWorker.Unlock()
-			var rmsg *TopicInfoReply
-			if err := proto.Unmarshal(rdata, rmsg); err == nil {
+			var rmsg TopicInfoReply
+			if err := proto.Unmarshal(rdata, &rmsg); err == nil {
 				return int(rmsg.Key)
 			}
 		}
@@ -238,7 +245,7 @@
 }
 
 func (h *Handle) send2(sc *sockClient, data []byte, logID string) error {
-	if r := sc.sock.SendTo(data, sc.peer); r != 0 {
+	if r := sc.sock.SendToTimeout(data, sc.peer, timeoutSec, timeoutUsec); r != 0 {
 		return fmt.Errorf("%s SendTo Failed: %d", logID, r)
 	}
 	return nil
@@ -259,7 +266,7 @@
 	defer h.mtxWorker.Unlock()
 	msg, err := proto.Marshal(info)
 	if err == nil {
-		if r := h.sockWorker.sock.SendTo(msg, key); r != 0 {
+		if r := h.sockWorker.sock.SendToTimeout(msg, key, timeoutSec, timeoutUsec); r != 0 {
 			return fmt.Errorf("SendOnly Failed: %d", r)
 		}
 	}
@@ -286,28 +293,28 @@
 	}
 
 	// 鍚屾鎺ュ彛,闇�瑕佺瓑寰呰繑鍥炲��
-	var ret *MsgInfo
+	var ret MsgInfo
 loop:
 	for {
 		select {
 		case <-h.ctx.Done():
 			return nil
 		default:
-			if data, err := h.sockWorker.sock.SendAndRecv(msg, key); err == nil {
-				if err := proto.Unmarshal(data, ret); err == nil {
+			if data, err := h.sockWorker.sock.SendAndRecvTimeout(msg, key, timeoutSec, timeoutUsec); err == nil {
+				if err := proto.Unmarshal(data, &ret); err == nil {
 					break loop
 				}
 			}
 		}
 	}
-	return ret
+	return &ret
 }
 
 // Reply request
 func (h *Handle) Reply(key int, info *MsgInfo) error {
 	msg, err := proto.Marshal(info)
 	if err == nil {
-		if r := h.sockRep.sock.SendTo(msg, key); r != 0 {
+		if r := h.sockRep.sock.SendToTimeout(msg, key, timeoutSec, timeoutUsec); r != 0 {
 			return fmt.Errorf("Reply Failed: %d", r)
 		}
 	}

--
Gitblit v1.8.0