saas-smartAi通信协议标准库
gongshangguo
2022-03-01 81a82791a041420f8bd2232f69310a5c52952291
client/client.go
@@ -38,6 +38,8 @@
   StateDisconnected
)
var syncReq map[string]chan *aiot.Protocol
// 连接状态
type State int32
@@ -46,8 +48,8 @@
   net.Conn
   // 注册包
   deviceRegister *aiot.DeviceRegister
   // 关闭锁
   closeLock *sync.Mutex
   // 客户端锁
   lock *sync.Mutex
   // 消息锁
   msgLock *sync.Mutex
   // 连接地址
@@ -81,8 +83,7 @@
   logger.Debug("New Client...")
   return &Client{
      deviceRegister: deviceRegister,
      closeLock: new(sync.Mutex),
      msgLock: new(sync.Mutex),
      lock: new(sync.Mutex),
      addr: addr,
      deviceId: clientId,
      writeChan: make(chan []byte),
@@ -105,8 +106,7 @@
func (c *Client) InitClient() {
   // 初始化当前属性值
   c.Conn = nil
   c.closeLock = new(sync.Mutex)
   c.msgLock = new(sync.Mutex)
   c.lock = new(sync.Mutex)
   c.writeChan = make(chan []byte)
   c.exitChan = make(chan int8)
   c.state = StateInit
@@ -197,11 +197,9 @@
      case bodyByte := <- c.writeChan:
         binary.BigEndian.PutUint32(byte4, uint32(len(bodyByte)))
         body = append(byte4, bodyByte...)
         c.closeLock.Lock()
         c.msgLock.Lock()
         c.lock.Lock()
         _,err = c.Conn.Write(body)
         c.closeLock.Unlock()
         c.msgLock.Unlock()
         c.lock.Unlock()
         if err != nil {
            c.Logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte)))
            c.Close()
@@ -220,15 +218,15 @@
// 发送消息
func (c *Client) WriteMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) {
   // 锁
   c.closeLock.Lock()
   c.lock.Lock()
   // 关闭的连接不能写入
   if c.IsClosed() {
      c.Logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data)))
      c.closeLock.Unlock()
      c.lock.Unlock()
      return nil,nil
   }
   c.closeLock.Unlock()
   c.lock.Unlock()
   // 拼装并发送消息
   body := &aiot.Protocol{
@@ -245,18 +243,19 @@
}
func (c *Client) WriteBody(body *aiot.Protocol) error {
   defer func() {
      if err := recover();err != nil  {
         c.Logger.Error("Write Body Error:", err)
      }
   }()
   body.Receiver = aiot.RECEIVER_TO_SAAS
   c.Logger.Debug("Write Body...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
   msgData, err := json.Marshal(body)
   if err != nil {
      c.Logger.Error("Fail to Marshal send data", zap.Error(err))
      return err
   }
   c.msgLock.Lock()
   c.closeLock.Lock()
   c.Logger.Debug("Write Body into writeChan...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
   c.writeChan <- msgData
   c.closeLock.Unlock()
   c.msgLock.Unlock()
   return nil
}
@@ -296,6 +295,11 @@
// 发送业务包请求
func (c *Client) Request(receiver aiot.RECEIVER, senderId string, msgProto *aiot.MsgIdProto, data []byte) error {
   defer func() {
      if err := recover();err != nil  {
         c.Logger.Error("Write Body Error:", err)
      }
   }()
   body := &aiot.Protocol{}
   body.Receiver = receiver
   body.SenderId = senderId
@@ -309,11 +313,7 @@
      c.Logger.Error("Fail to Marshal send data", zap.Error(err))
      return err
   }
   c.msgLock.Lock()
   c.closeLock.Lock()
   c.writeChan <- msgData
   c.closeLock.Unlock()
   c.msgLock.Unlock()
   return nil
}
@@ -322,14 +322,17 @@
   var err error
   var length uint32
   for {
      c.tmpByte4Slice = make([]byte, 4)
      c.SetDeadline(time.Now().Add(DefaultReaderTimeOut))
      // 读取长度
      c.lock.Lock()
      _, err = io.ReadFull(c.Reader, c.tmpByte4Slice)
      c.lock.Unlock()
      if err != nil {
         if err == io.EOF {
            c.Logger.Error("Fail to read request byte4", zap.Error(err))
            err = nil
         } else {
            c.Logger.Error("Fail to read request", zap.Error(err))
            c.Close()
            return
         }
@@ -341,9 +344,19 @@
      }
      // 读取body
      bodyByte := make([]byte, length)
      c.closeLock.Lock()
      c.lock.Lock()
      _, err = io.ReadFull(c.Reader, bodyByte)
      c.closeLock.Unlock()
      c.lock.Unlock()
      if err != nil {
         if err == io.EOF {
            c.Logger.Error("Fail to read request body", zap.Error(err))
            err = nil
         } else {
            c.Close()
            return
         }
         break
      }
      body := &aiot.Protocol{}
      err = json.Unmarshal(bodyByte, body)
      if err != nil {
@@ -445,8 +458,8 @@
// 关闭TCP
func (c *Client) Close() {
   c.Logger.Debug("Closing connect", zap.String("addr", c.addr))
   c.closeLock.Lock()
   defer c.closeLock.Unlock()
   c.lock.Lock()
   defer c.lock.Unlock()
   // 关闭通道
   if !c.IsClosed() {