From 0bde715af72b7b3d55ad3aac816d7cd153a60b42 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期一, 03 八月 2020 13:29:21 +0800 Subject: [PATCH] add timeout api --- library.go | 35 +++++++++++++++++++++-------------- 1 files changed, 21 insertions(+), 14 deletions(-) diff --git a/library.go b/library.go index 0d97e0f..491d63d 100644 --- a/library.go +++ b/library.go @@ -85,14 +85,16 @@ case <-ctx.Done(): return default: - if data, peer, err := sock.RecvFrom(); err == nil { - var info *MsgInfo - if err := proto.Unmarshal(data, info); err == nil { + if data, peer, err := sock.RecvFromTimeout(0, 10*1000); err == nil { + var info MsgInfo + if err := proto.Unmarshal(data, &info); err == nil { ch <- TransInfo{ - info: info, + info: &info, port: peer, } } + } else { + // time.Sleep(10 * time.Millisecond) } } } @@ -211,6 +213,11 @@ return handle } +const ( + timeoutSec = 1 + timeoutUsec = 0 +) + // GetTopicInfo get topic info func (h *Handle) GetTopicInfo(topic, typ string) int { // 鎹涓嶆洿鏂�,鍏堢敤缂撳瓨,鍚﹀垯闇�瑕佹柊鍒涘缓涓�涓猻ocket,鏉ヤ粠manager璇锋眰key @@ -225,10 +232,10 @@ } if data, err := proto.Marshal(msg); err == nil { h.mtxWorker.Lock() - if rdata, err := h.sockWorker.sock.SendAndRecv(data, h.sockWorker.peer); err == nil { + if rdata, err := h.sockWorker.sock.SendAndRecvTimeout(data, h.sockWorker.peer, timeoutSec, timeoutUsec); err == nil { h.mtxWorker.Unlock() - var rmsg *TopicInfoReply - if err := proto.Unmarshal(rdata, rmsg); err == nil { + var rmsg TopicInfoReply + if err := proto.Unmarshal(rdata, &rmsg); err == nil { return int(rmsg.Key) } } @@ -238,7 +245,7 @@ } func (h *Handle) send2(sc *sockClient, data []byte, logID string) error { - if r := sc.sock.SendTo(data, sc.peer); r != 0 { + if r := sc.sock.SendToTimeout(data, sc.peer, timeoutSec, timeoutUsec); r != 0 { return fmt.Errorf("%s SendTo Failed: %d", logID, r) } return nil @@ -259,7 +266,7 @@ defer h.mtxWorker.Unlock() msg, err := proto.Marshal(info) if err == nil { - if r := h.sockWorker.sock.SendTo(msg, key); r != 0 { + if r := h.sockWorker.sock.SendToTimeout(msg, key, timeoutSec, timeoutUsec); r != 0 { return fmt.Errorf("SendOnly Failed: %d", r) } } @@ -286,28 +293,28 @@ } // 鍚屾鎺ュ彛,闇�瑕佺瓑寰呰繑鍥炲�� - var ret *MsgInfo + var ret MsgInfo loop: for { select { case <-h.ctx.Done(): return nil default: - if data, err := h.sockWorker.sock.SendAndRecv(msg, key); err == nil { - if err := proto.Unmarshal(data, ret); err == nil { + if data, err := h.sockWorker.sock.SendAndRecvTimeout(msg, key, timeoutSec, timeoutUsec); err == nil { + if err := proto.Unmarshal(data, &ret); err == nil { break loop } } } } - return ret + return &ret } // Reply request func (h *Handle) Reply(key int, info *MsgInfo) error { msg, err := proto.Marshal(info) if err == nil { - if r := h.sockRep.sock.SendTo(msg, key); r != 0 { + if r := h.sockRep.sock.SendToTimeout(msg, key, timeoutSec, timeoutUsec); r != 0 { return fmt.Errorf("Reply Failed: %d", r) } } -- Gitblit v1.8.0