From 14c25a6be6d147f90011d0218380bf7cd58b76e5 Mon Sep 17 00:00:00 2001
From: gongshangguo <gongshangguo@admin.com>
Date: 星期二, 01 三月 2022 20:05:27 +0800
Subject: [PATCH] 关闭的通道禁止写入
---
client/client.go | 151 +++++++++++++++++++++++++++++++------------------
1 files changed, 95 insertions(+), 56 deletions(-)
diff --git a/client/client.go b/client/client.go
index 50f01b1..532803b 100644
--- a/client/client.go
+++ b/client/client.go
@@ -6,6 +6,7 @@
"bufio"
"encoding/binary"
"encoding/json"
+ "errors"
uuid "github.com/satori/go.uuid"
"go.uber.org/zap"
"io"
@@ -22,8 +23,6 @@
DefaultHeartbeatInterval = 15 * time.Second
// 璇诲彇鏁版嵁瓒呮椂鏃堕棿
DefaultReaderTimeOut = 60 * time.Second
- // 杩炴帴灏濊瘯闂撮殧
- DefaultNetRetry = 10 * time.Second
)
// 杩炴帴鐘舵��
@@ -48,8 +47,10 @@
deviceRegister *aiot.DeviceRegister
// 鍏抽棴閿�
closeLock *sync.Mutex
- // 娑堟伅閿�
- msgLock *sync.Mutex
+ // 璇诲彇閿�
+ readLock *sync.Mutex
+ // 鍐欏叆閿�
+ writeLock *sync.Mutex
// 杩炴帴鍦板潃
addr string
// 璁惧ID
@@ -73,7 +74,7 @@
// 蹇冭烦鍖�
heartBeatProto *aiot.HeartBeatProto
// logger
- logger *zap.SugaredLogger
+ Logger *zap.SugaredLogger
}
// 鍒濆鍖栧鎴风
@@ -81,18 +82,18 @@
logger.Debug("New Client...")
return &Client{
deviceRegister: deviceRegister,
+ readLock: new(sync.Mutex),
closeLock: new(sync.Mutex),
- msgLock: new(sync.Mutex),
+ writeLock: new(sync.Mutex),
addr: addr,
deviceId: clientId,
writeChan: make(chan []byte),
- exitChan: make(chan int8),
state: StateInit,
tmpByte4Slice: make([]byte, 4),
waitGroup: &util.WaitGroupWrapper{},
clientCallback: callBack,
heartBeatProto: &aiot.HeartBeatProto{},
- logger: logger,
+ Logger: logger,
}
}
@@ -106,7 +107,8 @@
// 鍒濆鍖栧綋鍓嶅睘鎬у��
c.Conn = nil
c.closeLock = new(sync.Mutex)
- c.msgLock = new(sync.Mutex)
+ c.readLock = new(sync.Mutex)
+ c.writeLock = new(sync.Mutex)
c.writeChan = make(chan []byte)
c.exitChan = make(chan int8)
c.state = StateInit
@@ -117,35 +119,35 @@
// 鍚姩鏈嶅姟
func (c *Client) StartSrv() {
// 鍒ゆ柇杩炴帴鐘舵�侊紝閬垮厤閲嶅杩炴帴
- c.logger.Debug("Start client service...")
+ c.Logger.Debug("Start client service...")
if c.IsConnected(){
- c.logger.Error("net is connected, please do not repeat connect", zap.String("addr", c.addr))
+ c.Logger.Error("net is connected, please do not repeat connect", zap.String("addr", c.addr))
return
}
// 鍦板潃鏄惁鍙敤
if c.addr == "" {
- c.logger.Warn("net addr is nil, wait for retry", zap.String("deviceId", c.deviceId))
+ c.Logger.Warn("net addr is nil, wait for retry", zap.String("deviceId", c.deviceId))
return
}
// 杩炴帴TCP
- c.logger.Debug("Connecting to service", zap.String("addr", c.addr))
+ c.Logger.Debug("Connecting to service", zap.String("addr", c.addr))
tcpAddr, err := net.ResolveTCPAddr("tcp", c.addr)
if err != nil {
- c.logger.Warn("Net addr can not be connect for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
+ c.Logger.Warn("Net addr can not be connect for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
return
}
conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
- c.logger.Warn("Net addr can not be dial for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
+ c.Logger.Warn("Net addr can not be dial for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
return
}
c.Conn = conn
// 璁剧疆杩炴帴鐘舵��
c.SetState(StateConnected)
- c.logger.Debug("Client service connected.", zap.String("addr", c.addr))
+ c.Logger.Debug("Client service connected.", zap.String("addr", c.addr))
// 鍚敤璇诲彇閫氶亾
c.SetRWBuf()
@@ -156,7 +158,7 @@
// 鍚敤蹇冭烦
c.waitGroup.Wrap(c.writeHeartBeat)
c.Wait()
- c.logger.Warn("Client service disconnected.Return...", zap.String("addr", c.addr))
+ c.Logger.Warn("Client service disconnected.Return...", zap.String("addr", c.addr))
}
// 璁剧疆杩炴帴鐘舵��
@@ -189,24 +191,22 @@
byte4 := make([]byte,4)
for {
select {
- case <- c.exitChan:
- c.logger.Debug("Close client", zap.String("deviceId", c.deviceId))
- c.Close()
- c.logger.Warn("writeLoop Done...")
- return
case bodyByte := <- c.writeChan:
binary.BigEndian.PutUint32(byte4, uint32(len(bodyByte)))
body = append(byte4, bodyByte...)
+ c.writeLock.Lock()
_,err = c.Conn.Write(body)
+ c.writeLock.Unlock()
if err != nil {
- c.logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte)))
+ c.Logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte)))
c.Close()
- c.logger.Warn("writeLoop Done...")
+ c.Logger.Warn("writeLoop Done...")
return
}
err = c.Writer.Flush()
+ c.Logger.Debug("Write msg success...", zap.String("msg", string(bodyByte)))
if err != nil {
- c.logger.Error("Fail to write flush", zap.Error(err))
+ c.Logger.Error("Fail to write flush", zap.Error(err))
}
}
}
@@ -214,13 +214,9 @@
// 鍙戦�佹秷鎭�
func (c *Client) WriteMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) {
- // 閿�
- c.closeLock.Lock()
- defer c.closeLock.Unlock()
-
// 鍏抽棴鐨勮繛鎺ヤ笉鑳藉啓鍏�
if c.IsClosed() {
- c.logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data)))
+ c.Logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data)))
return nil,nil
}
@@ -234,38 +230,47 @@
}
// 鍙戦�佹秷鎭�
- c.WriteBody(body)
+ _ = c.WriteBody(body)
return body, nil
}
func (c *Client) WriteBody(body *aiot.Protocol) error {
+ defer func() {
+ if err := recover();err != nil {
+ c.Logger.Error("Write Body Error:", err)
+ }
+ }()
+
+ if c.IsClosed() {
+ errMsg := "Can not write msg into closed chain"
+ c.Logger.Warn(errMsg, zap.Any("msg",body))
+ return errors.New(errMsg)
+ }
body.Receiver = aiot.RECEIVER_TO_SAAS
- c.logger.Debug("Send msg...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
msgData, err := json.Marshal(body)
if err != nil {
- c.logger.Error("Fail to Marshal send data", zap.Error(err))
+ c.Logger.Error("Fail to Marshal send data", zap.Error(err))
return err
}
- c.msgLock.Lock()
- defer c.msgLock.Unlock()
+ c.Logger.Debug("Write Body into writeChan...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
c.writeChan <- msgData
return nil
}
// 鍙戦�佹敞鍐屽寘
func (c *Client) writeRegister() {
- c.logger.Debug("registering...")
+ c.Logger.Debug("registering...")
data := c.deviceRegister
msgData, _ := json.Marshal(data)
_, err := c.WriteMsg(c.deviceId, aiot.MSG_TYPE_REGISTER, aiot.REQ_TYPE_REQUEST, msgData, c.GetMsgProto(""))
if err != nil {
- c.logger.Error("Fail to send device register", zap.Any("msg", msgData))
+ c.Logger.Error("Fail to send device register", zap.Any("msg", msgData))
}
}
// 鍙戦�佸績璺冲寘
func (c *Client) writeHeartBeat() {
- c.logger.Debug("Start HeartBeating...",zap.String("addr", c.addr))
+ c.Logger.Debug("Start HeartBeating...",zap.String("addr", c.addr))
pingData, _ := json.Marshal(c.heartBeatProto)
t := time.NewTicker(DefaultHeartbeatInterval)
defer func() {
@@ -286,19 +291,46 @@
}
}
+// 鍙戦�佷笟鍔″寘璇锋眰
+func (c *Client) Request(receiver aiot.RECEIVER, senderId string, msgProto *aiot.MsgIdProto, data []byte) error {
+ defer func() {
+ if err := recover();err != nil {
+ c.Logger.Error("Write Body Error:", err)
+ }
+ }()
+ body := &aiot.Protocol{}
+ body.Receiver = receiver
+ body.SenderId = senderId
+ body.MsgProto = msgProto
+ body.MsgType = aiot.MSG_TYPE_BUSINESS
+ body.ReqType = aiot.REQ_TYPE_REQUEST
+ body.Data = data
+ c.Logger.Debug("Send msg...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
+ msgData, err := json.Marshal(body)
+ if err != nil {
+ c.Logger.Error("Fail to Marshal send data", zap.Error(err))
+ return err
+ }
+ c.writeChan <- msgData
+ return nil
+}
+
// 娑堟伅璇诲彇閫氶亾
func (c *Client) readLoop() {
var err error
var length uint32
for {
- c.SetDeadline(time.Now().Add(DefaultReaderTimeOut))
+ c.tmpByte4Slice = make([]byte, 4)
+ _ = c.SetDeadline(time.Now().Add(DefaultReaderTimeOut))
// 璇诲彇闀垮害
+ c.readLock.Lock()
_, err = io.ReadFull(c.Reader, c.tmpByte4Slice)
+ c.readLock.Unlock()
if err != nil {
if err == io.EOF {
+ c.Logger.Error("Fail to read request byte4", zap.Error(err))
err = nil
} else {
- c.logger.Error("Fail to read request", zap.Error(err))
c.Close()
return
}
@@ -306,31 +338,43 @@
}
length = binary.BigEndian.Uint32(c.tmpByte4Slice)
if length > DefaultBufferSize {
- c.logger.Error("Fail to read request data from io", zap.Uint32("length",length))
+ c.Logger.Error("Fail to read request data from io", zap.Uint32("length",length))
}
// 璇诲彇body
bodyByte := make([]byte, length)
- c.closeLock.Lock()
+ c.readLock.Lock()
_, err = io.ReadFull(c.Reader, bodyByte)
- c.closeLock.Unlock()
+ c.readLock.Unlock()
+ if err != nil {
+ if err == io.EOF {
+ c.Logger.Error("Fail to read request body", zap.Error(err))
+ err = nil
+ } else {
+ c.Close()
+ return
+ }
+ break
+ }
body := &aiot.Protocol{}
err = json.Unmarshal(bodyByte, body)
if err != nil {
- c.logger.Error("Fail to unmarshal message", zap.Error(err), zap.String("msg", string(bodyByte)))
+ c.Logger.Error("Fail to unmarshal message", zap.Error(err), zap.String("msg", string(bodyByte)))
}
- c.logger.Debug("Read msg...", zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
+ c.Logger.Debug("Read msg...", zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
// 澶勭悊鍥炶皟
c.onMessage(body)
}
- c.logger.Warn("ReadLoop Done...")
+ c.Logger.Warn("ReadLoop Done...")
+ // 鍏抽棴杩炴帴
+ c.Close()
}
// 澶勭悊鍥炶皟
func (c *Client) onMessage (body *aiot.Protocol) {
// 鏈皝瑁卌allback锛屽彧鍐欐棩蹇�
if c.clientCallback == nil {
- c.logger.Warn("ClientCallBack interface is nil...", zap.String("addr", c.addr), zap.Any("body", body))
+ c.Logger.Warn("ClientCallBack interface is nil...", zap.String("addr", c.addr), zap.Any("body", body))
return
}
@@ -338,12 +382,12 @@
switch body.MsgType {
// 蹇冭烦鍥炲
case aiot.MSG_TYPE_HEART_BEAT:
- c.clientCallback.OnHeartBeat(c,body)
+ go c.clientCallback.OnHeartBeat(c, body)
return
// 娉ㄥ唽鍥炲
case aiot.MSG_TYPE_REGISTER:
- c.clientCallback.OnRegister(c,body)
+ go c.clientCallback.OnRegister(c, body)
return
// 璁惧鎺у埗
@@ -411,22 +455,17 @@
// 鍏抽棴TCP
func (c *Client) Close() {
- c.logger.Debug("Closing connect", zap.String("addr", c.addr))
+ c.Logger.Debug("Closing connect", zap.String("addr", c.addr))
c.closeLock.Lock()
defer c.closeLock.Unlock()
-
// 鍏抽棴閫氶亾
if !c.IsClosed() {
- c.Conn.Close()
+ _ = c.Conn.Close()
if c.IsConnected() {
c.clientCallback.OnClose(c)
}
// 璁剧疆杩炴帴灞炴��
c.SetState(StateDisconnected)
-
- // 鍏抽棴绠¢亾
- close(c.exitChan)
- close(c.writeChan)
}
}
--
Gitblit v1.8.0