From 14c25a6be6d147f90011d0218380bf7cd58b76e5 Mon Sep 17 00:00:00 2001
From: gongshangguo <gongshangguo@admin.com>
Date: 星期二, 01 三月 2022 20:05:27 +0800
Subject: [PATCH] 关闭的通道禁止写入
---
client/client.go | 94 +++++++++++++++++++++++++----------------------
1 files changed, 50 insertions(+), 44 deletions(-)
diff --git a/client/client.go b/client/client.go
index 3d6035b..532803b 100644
--- a/client/client.go
+++ b/client/client.go
@@ -6,6 +6,7 @@
"bufio"
"encoding/binary"
"encoding/json"
+ "errors"
uuid "github.com/satori/go.uuid"
"go.uber.org/zap"
"io"
@@ -22,8 +23,6 @@
DefaultHeartbeatInterval = 15 * time.Second
// 璇诲彇鏁版嵁瓒呮椂鏃堕棿
DefaultReaderTimeOut = 60 * time.Second
- // 杩炴帴灏濊瘯闂撮殧
- DefaultNetRetry = 10 * time.Second
)
// 杩炴帴鐘舵��
@@ -48,8 +47,10 @@
deviceRegister *aiot.DeviceRegister
// 鍏抽棴閿�
closeLock *sync.Mutex
- // 娑堟伅閿�
- msgLock *sync.Mutex
+ // 璇诲彇閿�
+ readLock *sync.Mutex
+ // 鍐欏叆閿�
+ writeLock *sync.Mutex
// 杩炴帴鍦板潃
addr string
// 璁惧ID
@@ -81,12 +82,12 @@
logger.Debug("New Client...")
return &Client{
deviceRegister: deviceRegister,
+ readLock: new(sync.Mutex),
closeLock: new(sync.Mutex),
- msgLock: new(sync.Mutex),
+ writeLock: new(sync.Mutex),
addr: addr,
deviceId: clientId,
writeChan: make(chan []byte),
- exitChan: make(chan int8),
state: StateInit,
tmpByte4Slice: make([]byte, 4),
waitGroup: &util.WaitGroupWrapper{},
@@ -106,7 +107,8 @@
// 鍒濆鍖栧綋鍓嶅睘鎬у��
c.Conn = nil
c.closeLock = new(sync.Mutex)
- c.msgLock = new(sync.Mutex)
+ c.readLock = new(sync.Mutex)
+ c.writeLock = new(sync.Mutex)
c.writeChan = make(chan []byte)
c.exitChan = make(chan int8)
c.state = StateInit
@@ -189,19 +191,12 @@
byte4 := make([]byte,4)
for {
select {
- case <- c.exitChan:
- c.Logger.Debug("Close client", zap.String("deviceId", c.deviceId))
- c.Close()
- c.Logger.Warn("writeLoop Done...")
- return
case bodyByte := <- c.writeChan:
binary.BigEndian.PutUint32(byte4, uint32(len(bodyByte)))
body = append(byte4, bodyByte...)
- c.closeLock.Lock()
- c.msgLock.Lock()
+ c.writeLock.Lock()
_,err = c.Conn.Write(body)
- c.closeLock.Unlock()
- c.msgLock.Unlock()
+ c.writeLock.Unlock()
if err != nil {
c.Logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte)))
c.Close()
@@ -219,16 +214,11 @@
// 鍙戦�佹秷鎭�
func (c *Client) WriteMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) {
- // 閿�
- c.closeLock.Lock()
-
// 鍏抽棴鐨勮繛鎺ヤ笉鑳藉啓鍏�
if c.IsClosed() {
c.Logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data)))
- c.closeLock.Unlock()
return nil,nil
}
- c.closeLock.Unlock()
// 鎷艰骞跺彂閫佹秷鎭�
body := &aiot.Protocol{
@@ -240,23 +230,30 @@
}
// 鍙戦�佹秷鎭�
- c.WriteBody(body)
+ _ = c.WriteBody(body)
return body, nil
}
func (c *Client) WriteBody(body *aiot.Protocol) error {
+ defer func() {
+ if err := recover();err != nil {
+ c.Logger.Error("Write Body Error:", err)
+ }
+ }()
+
+ if c.IsClosed() {
+ errMsg := "Can not write msg into closed chain"
+ c.Logger.Warn(errMsg, zap.Any("msg",body))
+ return errors.New(errMsg)
+ }
body.Receiver = aiot.RECEIVER_TO_SAAS
- c.Logger.Debug("Write Body...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
msgData, err := json.Marshal(body)
if err != nil {
c.Logger.Error("Fail to Marshal send data", zap.Error(err))
return err
}
- c.msgLock.Lock()
- c.closeLock.Lock()
+ c.Logger.Debug("Write Body into writeChan...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
c.writeChan <- msgData
- c.closeLock.Unlock()
- c.msgLock.Unlock()
return nil
}
@@ -296,6 +293,11 @@
// 鍙戦�佷笟鍔″寘璇锋眰
func (c *Client) Request(receiver aiot.RECEIVER, senderId string, msgProto *aiot.MsgIdProto, data []byte) error {
+ defer func() {
+ if err := recover();err != nil {
+ c.Logger.Error("Write Body Error:", err)
+ }
+ }()
body := &aiot.Protocol{}
body.Receiver = receiver
body.SenderId = senderId
@@ -309,11 +311,7 @@
c.Logger.Error("Fail to Marshal send data", zap.Error(err))
return err
}
- c.msgLock.Lock()
- c.closeLock.Lock()
c.writeChan <- msgData
- c.closeLock.Unlock()
- c.msgLock.Unlock()
return nil
}
@@ -322,14 +320,17 @@
var err error
var length uint32
for {
- c.SetDeadline(time.Now().Add(DefaultReaderTimeOut))
+ c.tmpByte4Slice = make([]byte, 4)
+ _ = c.SetDeadline(time.Now().Add(DefaultReaderTimeOut))
// 璇诲彇闀垮害
+ c.readLock.Lock()
_, err = io.ReadFull(c.Reader, c.tmpByte4Slice)
+ c.readLock.Unlock()
if err != nil {
if err == io.EOF {
+ c.Logger.Error("Fail to read request byte4", zap.Error(err))
err = nil
} else {
- c.Logger.Error("Fail to read request", zap.Error(err))
c.Close()
return
}
@@ -341,9 +342,19 @@
}
// 璇诲彇body
bodyByte := make([]byte, length)
- c.closeLock.Lock()
+ c.readLock.Lock()
_, err = io.ReadFull(c.Reader, bodyByte)
- c.closeLock.Unlock()
+ c.readLock.Unlock()
+ if err != nil {
+ if err == io.EOF {
+ c.Logger.Error("Fail to read request body", zap.Error(err))
+ err = nil
+ } else {
+ c.Close()
+ return
+ }
+ break
+ }
body := &aiot.Protocol{}
err = json.Unmarshal(bodyByte, body)
if err != nil {
@@ -356,7 +367,7 @@
c.Logger.Warn("ReadLoop Done...")
// 鍏抽棴杩炴帴
- c.exitChan <- 1
+ c.Close()
}
// 澶勭悊鍥炶皟
@@ -371,12 +382,12 @@
switch body.MsgType {
// 蹇冭烦鍥炲
case aiot.MSG_TYPE_HEART_BEAT:
- c.clientCallback.OnHeartBeat(c,body)
+ go c.clientCallback.OnHeartBeat(c, body)
return
// 娉ㄥ唽鍥炲
case aiot.MSG_TYPE_REGISTER:
- c.clientCallback.OnRegister(c,body)
+ go c.clientCallback.OnRegister(c, body)
return
// 璁惧鎺у埗
@@ -447,19 +458,14 @@
c.Logger.Debug("Closing connect", zap.String("addr", c.addr))
c.closeLock.Lock()
defer c.closeLock.Unlock()
-
// 鍏抽棴閫氶亾
if !c.IsClosed() {
- c.Conn.Close()
+ _ = c.Conn.Close()
if c.IsConnected() {
c.clientCallback.OnClose(c)
}
// 璁剧疆杩炴帴灞炴��
c.SetState(StateDisconnected)
-
- // 鍏抽棴绠¢亾
- close(c.exitChan)
- close(c.writeChan)
}
}
--
Gitblit v1.8.0