saas-smartAi通信协议标准库
gongshangguo
2022-03-08 a32c642ffa2b9f05e3b55fc88b2c2bed3277a080
client/client.go
@@ -3,10 +3,10 @@
import (
   "basic.com/valib/go-aiot.git/aiotProto/aiot"
   "basic.com/valib/go-aiot.git/util"
   "basic.com/valib/logger.git"
   "bufio"
   "encoding/binary"
   "encoding/json"
   "errors"
   uuid "github.com/satori/go.uuid"
   "go.uber.org/zap"
   "io"
@@ -23,8 +23,6 @@
   DefaultHeartbeatInterval = 15 * time.Second
   // 读取数据超时时间
   DefaultReaderTimeOut = 60 * time.Second
   // 连接尝试间隔
   DefaultNetRetry = 10 * time.Second
)
// 连接状态
@@ -49,8 +47,10 @@
   deviceRegister *aiot.DeviceRegister
   // 关闭锁
   closeLock *sync.Mutex
   // 消息锁
   msgLock *sync.Mutex
   // 读取锁
   readLock *sync.Mutex
   // 写入锁
   writeLock *sync.Mutex
   // 连接地址
   addr string
   // 设备ID
@@ -61,8 +61,6 @@
   Writer *bufio.Writer
   // 写入通道
   writeChan chan []byte
   // 退出通道
   exitChan chan int8
   // 连接状态
   state State
   // 报文头
@@ -73,24 +71,27 @@
   clientCallback ClientCallBack
   // 心跳包
   heartBeatProto *aiot.HeartBeatProto
   // logger
   Logger *zap.SugaredLogger
}
// 初始化客户端
func NewClient(addr string, clientId string, deviceRegister *aiot.DeviceRegister, callBack ClientCallBack) *Client {
func NewClient(addr string, clientId string, deviceRegister *aiot.DeviceRegister, callBack ClientCallBack, logger *zap.SugaredLogger) *Client {
   logger.Debug("New Client...")
   return &Client{
      deviceRegister: deviceRegister,
      readLock: new(sync.Mutex),
      closeLock: new(sync.Mutex),
      msgLock: new(sync.Mutex),
      writeLock: new(sync.Mutex),
      addr: addr,
      deviceId: clientId,
      writeChan: make(chan []byte),
      exitChan: make(chan int8),
      state: StateInit,
      tmpByte4Slice: make([]byte, 4),
      waitGroup: &util.WaitGroupWrapper{},
      clientCallback: callBack,
      heartBeatProto: &aiot.HeartBeatProto{},
      Logger: logger,
   }
}
@@ -104,9 +105,9 @@
   // 初始化当前属性值
   c.Conn = nil
   c.closeLock = new(sync.Mutex)
   c.msgLock = new(sync.Mutex)
   c.readLock = new(sync.Mutex)
   c.writeLock = new(sync.Mutex)
   c.writeChan = make(chan []byte)
   c.exitChan = make(chan int8)
   c.state = StateInit
   c.tmpByte4Slice = make([]byte, 4)
   c.waitGroup = &util.WaitGroupWrapper{}
@@ -115,35 +116,35 @@
// 启动服务
func (c *Client) StartSrv() {
   // 判断连接状态,避免重复连接
   logger.Debug("Start client service...")
   c.Logger.Debug("Start client service...")
   if c.IsConnected(){
      logger.Error("net is connected, please do not repeat connect", zap.String("addr", c.addr))
      c.Logger.Error("net is connected, please do not repeat connect", zap.String("addr", c.addr))
      return
   }
   // 地址是否可用
   if c.addr == "" {
      logger.Warn("net addr is nil, wait for retry", zap.String("deviceId", c.deviceId))
      c.Logger.Warn("net addr is nil, wait for retry", zap.String("deviceId", c.deviceId))
      return
   }
   // 连接TCP
   logger.Debug("Connecting to service", zap.String("addr", c.addr))
   c.Logger.Debug("Connecting to service", zap.String("addr", c.addr))
   tcpAddr, err := net.ResolveTCPAddr("tcp", c.addr)
   if err != nil {
      logger.Warn("Net addr can not be connect for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
      c.Logger.Warn("Net addr can not be connect for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
      return
   }
   conn, err := net.DialTCP("tcp", nil, tcpAddr)
   if err != nil {
      logger.Warn("Net addr can not be dial for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
      c.Logger.Warn("Net addr can not be dial for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
      return
   }
   c.Conn = conn
   // 设置连接状态
   c.SetState(StateConnected)
   logger.Debug("Client service connected.", zap.String("addr", c.addr))
   c.Logger.Debug("Client service connected.", zap.String("addr", c.addr))
   // 启用读取通道
   c.SetRWBuf()
@@ -154,7 +155,7 @@
   // 启用心跳
   c.waitGroup.Wrap(c.writeHeartBeat)
   c.Wait()
   logger.Warn("Client service disconnected.Return...", zap.String("addr", c.addr))
   c.Logger.Warn("Client service disconnected.Return...", zap.String("addr", c.addr))
}
// 设置连接状态
@@ -187,43 +188,38 @@
   byte4 := make([]byte,4)
   for {
      select {
      case <- c.exitChan:
         logger.Debug("Close client", zap.String("deviceId", c.deviceId))
         c.Close()
         logger.Warn("writeLoop Done...")
         return
      case bodyByte := <- c.writeChan:
         binary.BigEndian.PutUint32(byte4, uint32(len(bodyByte)))
         body = append(byte4, bodyByte...)
         c.writeLock.Lock()
         _,err = c.Conn.Write(body)
         c.writeLock.Unlock()
         if err != nil {
            logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte)))
            c.Logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte)))
            c.Close()
            logger.Warn("writeLoop Done...")
            c.Logger.Warn("writeLoop Done...")
            return
         }
         err = c.Writer.Flush()
         c.Logger.Debug("Write msg success...", zap.String("msg", string(bodyByte)))
         if err != nil {
            logger.Error("Fail to write flush", zap.Error(err))
            c.Logger.Error("Fail to write flush", zap.Error(err))
         }
      }
   }
}
// 发送消息
func (c *Client) WriteMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) {
   // 锁
   c.closeLock.Lock()
   defer c.closeLock.Unlock()
func (c *Client) writeMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) {
   // 关闭的连接不能写入
   if c.IsClosed() {
      logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data)))
      c.Logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data)))
      return nil,nil
   }
   // 拼装并发送消息
   body := &aiot.Protocol{
      Receiver: aiot.RECEIVER_TO_SAAS,
      SenderId: senderId,
      MsgType: msgType,
      ReqType: reqType,
@@ -232,38 +228,52 @@
   }
   // 发送消息
   c.WriteBody(body)
   _ = c.WriteBody(body)
   return body, nil
}
func (c *Client) WriteBody(body *aiot.Protocol) error {
   body.Receiver = aiot.RECEIVER_TO_SAAS
   logger.Debug("Send msg...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
   defer func() {
      if err := recover();err != nil  {
         c.Logger.Error("Write Body Error:", err)
      }
   }()
   // 通道已关闭,不能写入
   if c.IsClosed() {
      errMsg := "Can not write msg into closed chain"
      c.Logger.Warn(errMsg, zap.Any("msg",body))
      return errors.New(errMsg)
   }
   // 消息ID默认处理
   if body.MsgProto == nil {
      body.MsgProto = GetMsgProto("")
   }
   msgData, err := json.Marshal(body)
   if err != nil {
      logger.Error("Fail to Marshal send data", zap.Error(err))
      c.Logger.Error("Fail to Marshal send data", zap.Error(err))
      return err
   }
   c.msgLock.Lock()
   defer c.msgLock.Unlock()
   c.Logger.Debug("Write Body into writeChan...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
   c.writeChan <- msgData
   return nil
}
// 发送注册包
func (c *Client) writeRegister() {
   logger.Debug("registering...")
   c.Logger.Debug("registering...")
   data := c.deviceRegister
   msgData, _ := json.Marshal(data)
   _, err := c.WriteMsg(c.deviceId, aiot.MSG_TYPE_REGISTER, aiot.REQ_TYPE_REQUEST, msgData, c.GetMsgProto(""))
   _, err := c.writeMsg(c.deviceId, aiot.MSG_TYPE_REGISTER, aiot.REQ_TYPE_REQUEST, msgData, GetMsgProto(""))
   if err != nil {
      logger.Error("Fail to send device register", zap.Any("msg", msgData))
      c.Logger.Error("Fail to send device register", zap.Any("msg", msgData))
   }
}
// 发送心跳包
func (c *Client) writeHeartBeat() {
   logger.Debug("Start HeartBeating...",zap.String("addr", c.addr))
   c.Logger.Debug("Start HeartBeating...",zap.String("addr", c.addr))
   pingData, _ := json.Marshal(c.heartBeatProto)
   t := time.NewTicker(DefaultHeartbeatInterval)
   defer func() {
@@ -279,9 +289,33 @@
            t.Stop()
            return
         }
         go c.WriteMsg(c.deviceId, aiot.MSG_TYPE_HEART_BEAT, aiot.REQ_TYPE_REQUEST, pingData, c.GetMsgProto(""))
         go c.writeMsg(c.deviceId, aiot.MSG_TYPE_HEART_BEAT, aiot.REQ_TYPE_REQUEST, pingData, GetMsgProto(""))
      }
   }
}
// 发送业务包请求
func (c *Client) Request(receiver aiot.RECEIVER, senderId string, msgProto *aiot.MsgIdProto, data []byte) error {
   defer func() {
      if err := recover();err != nil  {
         c.Logger.Error("Write Body Error:", err)
      }
   }()
   body := &aiot.Protocol{}
   body.Receiver = receiver
   body.SenderId = senderId
   body.MsgProto = msgProto
   body.MsgType = aiot.MSG_TYPE_BUSINESS
   body.ReqType = aiot.REQ_TYPE_REQUEST
   body.Data = data
   c.Logger.Debug("Send msg...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
   msgData, err := json.Marshal(body)
   if err != nil {
      c.Logger.Error("Fail to Marshal send data", zap.Error(err))
      return err
   }
   c.writeChan <- msgData
   return nil
}
// 消息读取通道
@@ -289,14 +323,17 @@
   var err error
   var length uint32
   for {
      c.SetDeadline(time.Now().Add(DefaultReaderTimeOut))
      c.tmpByte4Slice = make([]byte, 4)
      _ = c.SetDeadline(time.Now().Add(DefaultReaderTimeOut))
      // 读取长度
      c.readLock.Lock()
      _, err = io.ReadFull(c.Reader, c.tmpByte4Slice)
      c.readLock.Unlock()
      if err != nil {
         if err == io.EOF {
            c.Logger.Error("Fail to read request byte4", zap.Error(err))
            err = nil
         } else {
            logger.Error("Fail to read request", zap.Error(err))
            c.Close()
            return
         }
@@ -304,31 +341,43 @@
      }
      length = binary.BigEndian.Uint32(c.tmpByte4Slice)
      if length > DefaultBufferSize {
         logger.Error("Fail to read request data from io", zap.Uint32("length",length))
         c.Logger.Error("Fail to read request data from io", zap.Uint32("length",length))
      }
      // 读取body
      bodyByte := make([]byte, length)
      c.closeLock.Lock()
      c.readLock.Lock()
      _, err = io.ReadFull(c.Reader, bodyByte)
      c.closeLock.Unlock()
      c.readLock.Unlock()
      if err != nil {
         if err == io.EOF {
            c.Logger.Error("Fail to read request body", zap.Error(err))
            err = nil
         } else {
            c.Close()
            return
         }
         break
      }
      body := &aiot.Protocol{}
      err = json.Unmarshal(bodyByte, body)
      if err != nil {
         logger.Error("Fail to unmarshal message", zap.Error(err), zap.String("msg", string(bodyByte)))
         c.Logger.Error("Fail to unmarshal message", zap.Error(err), zap.String("msg", string(bodyByte)))
      }
      logger.Debug("Read msg...", zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
      c.Logger.Debug("Read msg...", zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
      // 处理回调
      c.onMessage(body)
   }
   logger.Warn("ReadLoop Done...")
   c.Logger.Warn("ReadLoop Done...")
   // 关闭连接
   c.Close()
}
// 处理回调
func (c *Client) onMessage (body *aiot.Protocol) {
   // 未封装callback,只写日志
   if c.clientCallback == nil {
      logger.Warn("ClientCallBack interface is nil...", zap.String("addr", c.addr), zap.Any("body", body))
      c.Logger.Warn("ClientCallBack interface is nil...", zap.String("addr", c.addr), zap.Any("body", body))
      return
   }
@@ -336,12 +385,12 @@
   switch body.MsgType {
   // 心跳回复
   case aiot.MSG_TYPE_HEART_BEAT:
      c.clientCallback.OnHeartBeat(c,body)
      go c.clientCallback.OnHeartBeat(c, body)
      return
   // 注册回复
   case aiot.MSG_TYPE_REGISTER:
      c.clientCallback.OnRegister(c,body)
      go c.clientCallback.OnRegister(c, body)
      return
   // 设备控制
@@ -372,7 +421,7 @@
}
// 拼装消息ID
func (c *Client) GetMsgProto(msgId string) *aiot.MsgIdProto {
func GetMsgProto(msgId string) *aiot.MsgIdProto {
   // 新消息
   if msgId == "" {
      return &aiot.MsgIdProto{
@@ -392,6 +441,11 @@
   return c.deviceId
}
// 获取连接状态
func (c *Client) GetState() State {
   return c.state
}
// 判断连接是否关闭
func (c *Client) IsClosed() bool {
   return c.state == StateDisconnected
@@ -409,22 +463,19 @@
// 关闭TCP
func (c *Client) Close() {
   logger.Debug("Closing connect", zap.String("addr", c.addr))
   c.Logger.Debug("Closing connect...", zap.String("addr", c.addr))
   c.closeLock.Lock()
   defer c.closeLock.Unlock()
   // 关闭通道
   if !c.IsClosed() {
      c.Conn.Close()
      _ = c.Conn.Close()
      if c.IsConnected() {
         c.clientCallback.OnClose(c)
      }
      // 设置连接属性
      c.SetState(StateDisconnected)
      // 关闭管道
      close(c.exitChan)
      close(c.writeChan)
   }
   // 设置连接属性
   c.SetState(StateDisconnected)
   c.Logger.Debug("Connect closed...", zap.String("addr", c.addr))
}