From d3d05d0eecdd76e86eeb3d28dcd765db042dce17 Mon Sep 17 00:00:00 2001
From: gongshangguo <gongshangguo@admin.com>
Date: 星期四, 03 三月 2022 10:46:05 +0800
Subject: [PATCH] 修改获取消息ID方法
---
client/client.go | 116 +++++++++++++++++++++++++++++++--------------------------
1 files changed, 63 insertions(+), 53 deletions(-)
diff --git a/client/client.go b/client/client.go
index 3d6035b..e501367 100644
--- a/client/client.go
+++ b/client/client.go
@@ -6,6 +6,7 @@
"bufio"
"encoding/binary"
"encoding/json"
+ "errors"
uuid "github.com/satori/go.uuid"
"go.uber.org/zap"
"io"
@@ -22,8 +23,6 @@
DefaultHeartbeatInterval = 15 * time.Second
// 璇诲彇鏁版嵁瓒呮椂鏃堕棿
DefaultReaderTimeOut = 60 * time.Second
- // 杩炴帴灏濊瘯闂撮殧
- DefaultNetRetry = 10 * time.Second
)
// 杩炴帴鐘舵��
@@ -48,8 +47,10 @@
deviceRegister *aiot.DeviceRegister
// 鍏抽棴閿�
closeLock *sync.Mutex
- // 娑堟伅閿�
- msgLock *sync.Mutex
+ // 璇诲彇閿�
+ readLock *sync.Mutex
+ // 鍐欏叆閿�
+ writeLock *sync.Mutex
// 杩炴帴鍦板潃
addr string
// 璁惧ID
@@ -60,8 +61,6 @@
Writer *bufio.Writer
// 鍐欏叆閫氶亾
writeChan chan []byte
- // 閫�鍑洪�氶亾
- exitChan chan int8
// 杩炴帴鐘舵��
state State
// 鎶ユ枃澶�
@@ -81,12 +80,12 @@
logger.Debug("New Client...")
return &Client{
deviceRegister: deviceRegister,
+ readLock: new(sync.Mutex),
closeLock: new(sync.Mutex),
- msgLock: new(sync.Mutex),
+ writeLock: new(sync.Mutex),
addr: addr,
deviceId: clientId,
writeChan: make(chan []byte),
- exitChan: make(chan int8),
state: StateInit,
tmpByte4Slice: make([]byte, 4),
waitGroup: &util.WaitGroupWrapper{},
@@ -106,9 +105,9 @@
// 鍒濆鍖栧綋鍓嶅睘鎬у��
c.Conn = nil
c.closeLock = new(sync.Mutex)
- c.msgLock = new(sync.Mutex)
+ c.readLock = new(sync.Mutex)
+ c.writeLock = new(sync.Mutex)
c.writeChan = make(chan []byte)
- c.exitChan = make(chan int8)
c.state = StateInit
c.tmpByte4Slice = make([]byte, 4)
c.waitGroup = &util.WaitGroupWrapper{}
@@ -189,19 +188,12 @@
byte4 := make([]byte,4)
for {
select {
- case <- c.exitChan:
- c.Logger.Debug("Close client", zap.String("deviceId", c.deviceId))
- c.Close()
- c.Logger.Warn("writeLoop Done...")
- return
case bodyByte := <- c.writeChan:
binary.BigEndian.PutUint32(byte4, uint32(len(bodyByte)))
body = append(byte4, bodyByte...)
- c.closeLock.Lock()
- c.msgLock.Lock()
+ c.writeLock.Lock()
_,err = c.Conn.Write(body)
- c.closeLock.Unlock()
- c.msgLock.Unlock()
+ c.writeLock.Unlock()
if err != nil {
c.Logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte)))
c.Close()
@@ -219,16 +211,11 @@
// 鍙戦�佹秷鎭�
func (c *Client) WriteMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) {
- // 閿�
- c.closeLock.Lock()
-
// 鍏抽棴鐨勮繛鎺ヤ笉鑳藉啓鍏�
if c.IsClosed() {
c.Logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data)))
- c.closeLock.Unlock()
return nil,nil
}
- c.closeLock.Unlock()
// 鎷艰骞跺彂閫佹秷鎭�
body := &aiot.Protocol{
@@ -240,23 +227,30 @@
}
// 鍙戦�佹秷鎭�
- c.WriteBody(body)
+ _ = c.WriteBody(body)
return body, nil
}
func (c *Client) WriteBody(body *aiot.Protocol) error {
+ defer func() {
+ if err := recover();err != nil {
+ c.Logger.Error("Write Body Error:", err)
+ }
+ }()
+
+ if c.IsClosed() {
+ errMsg := "Can not write msg into closed chain"
+ c.Logger.Warn(errMsg, zap.Any("msg",body))
+ return errors.New(errMsg)
+ }
body.Receiver = aiot.RECEIVER_TO_SAAS
- c.Logger.Debug("Write Body...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
msgData, err := json.Marshal(body)
if err != nil {
c.Logger.Error("Fail to Marshal send data", zap.Error(err))
return err
}
- c.msgLock.Lock()
- c.closeLock.Lock()
+ c.Logger.Debug("Write Body into writeChan...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
c.writeChan <- msgData
- c.closeLock.Unlock()
- c.msgLock.Unlock()
return nil
}
@@ -265,7 +259,7 @@
c.Logger.Debug("registering...")
data := c.deviceRegister
msgData, _ := json.Marshal(data)
- _, err := c.WriteMsg(c.deviceId, aiot.MSG_TYPE_REGISTER, aiot.REQ_TYPE_REQUEST, msgData, c.GetMsgProto(""))
+ _, err := c.WriteMsg(c.deviceId, aiot.MSG_TYPE_REGISTER, aiot.REQ_TYPE_REQUEST, msgData, GetMsgProto(""))
if err != nil {
c.Logger.Error("Fail to send device register", zap.Any("msg", msgData))
}
@@ -289,13 +283,18 @@
t.Stop()
return
}
- go c.WriteMsg(c.deviceId, aiot.MSG_TYPE_HEART_BEAT, aiot.REQ_TYPE_REQUEST, pingData, c.GetMsgProto(""))
+ go c.WriteMsg(c.deviceId, aiot.MSG_TYPE_HEART_BEAT, aiot.REQ_TYPE_REQUEST, pingData, GetMsgProto(""))
}
}
}
// 鍙戦�佷笟鍔″寘璇锋眰
func (c *Client) Request(receiver aiot.RECEIVER, senderId string, msgProto *aiot.MsgIdProto, data []byte) error {
+ defer func() {
+ if err := recover();err != nil {
+ c.Logger.Error("Write Body Error:", err)
+ }
+ }()
body := &aiot.Protocol{}
body.Receiver = receiver
body.SenderId = senderId
@@ -309,11 +308,7 @@
c.Logger.Error("Fail to Marshal send data", zap.Error(err))
return err
}
- c.msgLock.Lock()
- c.closeLock.Lock()
c.writeChan <- msgData
- c.closeLock.Unlock()
- c.msgLock.Unlock()
return nil
}
@@ -322,14 +317,17 @@
var err error
var length uint32
for {
- c.SetDeadline(time.Now().Add(DefaultReaderTimeOut))
+ c.tmpByte4Slice = make([]byte, 4)
+ _ = c.SetDeadline(time.Now().Add(DefaultReaderTimeOut))
// 璇诲彇闀垮害
+ c.readLock.Lock()
_, err = io.ReadFull(c.Reader, c.tmpByte4Slice)
+ c.readLock.Unlock()
if err != nil {
if err == io.EOF {
+ c.Logger.Error("Fail to read request byte4", zap.Error(err))
err = nil
} else {
- c.Logger.Error("Fail to read request", zap.Error(err))
c.Close()
return
}
@@ -341,9 +339,19 @@
}
// 璇诲彇body
bodyByte := make([]byte, length)
- c.closeLock.Lock()
+ c.readLock.Lock()
_, err = io.ReadFull(c.Reader, bodyByte)
- c.closeLock.Unlock()
+ c.readLock.Unlock()
+ if err != nil {
+ if err == io.EOF {
+ c.Logger.Error("Fail to read request body", zap.Error(err))
+ err = nil
+ } else {
+ c.Close()
+ return
+ }
+ break
+ }
body := &aiot.Protocol{}
err = json.Unmarshal(bodyByte, body)
if err != nil {
@@ -356,7 +364,7 @@
c.Logger.Warn("ReadLoop Done...")
// 鍏抽棴杩炴帴
- c.exitChan <- 1
+ c.Close()
}
// 澶勭悊鍥炶皟
@@ -371,12 +379,12 @@
switch body.MsgType {
// 蹇冭烦鍥炲
case aiot.MSG_TYPE_HEART_BEAT:
- c.clientCallback.OnHeartBeat(c,body)
+ go c.clientCallback.OnHeartBeat(c, body)
return
// 娉ㄥ唽鍥炲
case aiot.MSG_TYPE_REGISTER:
- c.clientCallback.OnRegister(c,body)
+ go c.clientCallback.OnRegister(c, body)
return
// 璁惧鎺у埗
@@ -407,7 +415,7 @@
}
// 鎷艰娑堟伅ID
-func (c *Client) GetMsgProto(msgId string) *aiot.MsgIdProto {
+func GetMsgProto(msgId string) *aiot.MsgIdProto {
// 鏂版秷鎭�
if msgId == "" {
return &aiot.MsgIdProto{
@@ -427,6 +435,11 @@
return c.deviceId
}
+// 鑾峰彇杩炴帴鐘舵��
+func (c *Client) GetState() State {
+ return c.state
+}
+
// 鍒ゆ柇杩炴帴鏄惁鍏抽棴
func (c *Client) IsClosed() bool {
return c.state == StateDisconnected
@@ -444,22 +457,19 @@
// 鍏抽棴TCP
func (c *Client) Close() {
- c.Logger.Debug("Closing connect", zap.String("addr", c.addr))
+ c.Logger.Debug("Closing connect...", zap.String("addr", c.addr))
c.closeLock.Lock()
defer c.closeLock.Unlock()
-
// 鍏抽棴閫氶亾
if !c.IsClosed() {
- c.Conn.Close()
+ _ = c.Conn.Close()
if c.IsConnected() {
c.clientCallback.OnClose(c)
}
-
- // 璁剧疆杩炴帴灞炴��
- c.SetState(StateDisconnected)
-
- // 鍏抽棴绠¢亾
- close(c.exitChan)
close(c.writeChan)
}
+
+ // 璁剧疆杩炴帴灞炴��
+ c.SetState(StateDisconnected)
+ c.Logger.Debug("Connect closed...", zap.String("addr", c.addr))
}
--
Gitblit v1.8.0