saas-smartAi通信协议标准库
gongshangguo
2022-03-08 a32c642ffa2b9f05e3b55fc88b2c2bed3277a080
client/client.go
@@ -6,6 +6,7 @@
   "bufio"
   "encoding/binary"
   "encoding/json"
   "errors"
   uuid "github.com/satori/go.uuid"
   "go.uber.org/zap"
   "io"
@@ -22,8 +23,6 @@
   DefaultHeartbeatInterval = 15 * time.Second
   // 读取数据超时时间
   DefaultReaderTimeOut = 60 * time.Second
   // 连接尝试间隔
   DefaultNetRetry = 10 * time.Second
)
// 连接状态
@@ -37,8 +36,6 @@
   // 已断开
   StateDisconnected
)
var syncReq map[string]chan *aiot.Protocol
// 连接状态
type State int32
@@ -64,8 +61,6 @@
   Writer *bufio.Writer
   // 写入通道
   writeChan chan []byte
   // 退出通道
   exitChan chan int8
   // 连接状态
   state State
   // 报文头
@@ -113,7 +108,6 @@
   c.readLock = new(sync.Mutex)
   c.writeLock = new(sync.Mutex)
   c.writeChan = make(chan []byte)
   c.exitChan = make(chan int8)
   c.state = StateInit
   c.tmpByte4Slice = make([]byte, 4)
   c.waitGroup = &util.WaitGroupWrapper{}
@@ -216,7 +210,7 @@
}
// 发送消息
func (c *Client) WriteMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) {
func (c *Client) writeMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) {
   // 关闭的连接不能写入
   if c.IsClosed() {
      c.Logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data)))
@@ -225,6 +219,7 @@
   // 拼装并发送消息
   body := &aiot.Protocol{
      Receiver: aiot.RECEIVER_TO_SAAS,
      SenderId: senderId,
      MsgType: msgType,
      ReqType: reqType,
@@ -233,7 +228,7 @@
   }
   // 发送消息
   c.WriteBody(body)
   _ = c.WriteBody(body)
   return body, nil
}
@@ -243,7 +238,18 @@
         c.Logger.Error("Write Body Error:", err)
      }
   }()
   body.Receiver = aiot.RECEIVER_TO_SAAS
   // 通道已关闭,不能写入
   if c.IsClosed() {
      errMsg := "Can not write msg into closed chain"
      c.Logger.Warn(errMsg, zap.Any("msg",body))
      return errors.New(errMsg)
   }
   // 消息ID默认处理
   if body.MsgProto == nil {
      body.MsgProto = GetMsgProto("")
   }
   msgData, err := json.Marshal(body)
   if err != nil {
      c.Logger.Error("Fail to Marshal send data", zap.Error(err))
@@ -259,7 +265,7 @@
   c.Logger.Debug("registering...")
   data := c.deviceRegister
   msgData, _ := json.Marshal(data)
   _, err := c.WriteMsg(c.deviceId, aiot.MSG_TYPE_REGISTER, aiot.REQ_TYPE_REQUEST, msgData, c.GetMsgProto(""))
   _, err := c.writeMsg(c.deviceId, aiot.MSG_TYPE_REGISTER, aiot.REQ_TYPE_REQUEST, msgData, GetMsgProto(""))
   if err != nil {
      c.Logger.Error("Fail to send device register", zap.Any("msg", msgData))
   }
@@ -283,7 +289,7 @@
            t.Stop()
            return
         }
         go c.WriteMsg(c.deviceId, aiot.MSG_TYPE_HEART_BEAT, aiot.REQ_TYPE_REQUEST, pingData, c.GetMsgProto(""))
         go c.writeMsg(c.deviceId, aiot.MSG_TYPE_HEART_BEAT, aiot.REQ_TYPE_REQUEST, pingData, GetMsgProto(""))
      }
   }
}
@@ -415,7 +421,7 @@
}
// 拼装消息ID
func (c *Client) GetMsgProto(msgId string) *aiot.MsgIdProto {
func GetMsgProto(msgId string) *aiot.MsgIdProto {
   // 新消息
   if msgId == "" {
      return &aiot.MsgIdProto{
@@ -435,6 +441,11 @@
   return c.deviceId
}
// 获取连接状态
func (c *Client) GetState() State {
   return c.state
}
// 判断连接是否关闭
func (c *Client) IsClosed() bool {
   return c.state == StateDisconnected
@@ -452,17 +463,19 @@
// 关闭TCP
func (c *Client) Close() {
   c.Logger.Debug("Closing connect", zap.String("addr", c.addr))
   c.Logger.Debug("Closing connect...", zap.String("addr", c.addr))
   c.closeLock.Lock()
   defer c.closeLock.Unlock()
   // 关闭通道
   if !c.IsClosed() {
      c.Conn.Close()
      _ = c.Conn.Close()
      if c.IsConnected() {
         c.clientCallback.OnClose(c)
      }
      // 设置连接属性
      c.SetState(StateDisconnected)
      close(c.writeChan)
   }
   // 设置连接属性
   c.SetState(StateDisconnected)
   c.Logger.Debug("Connect closed...", zap.String("addr", c.addr))
}