saas-smartAi通信协议标准库
gongshangguo
2022-03-01 81a82791a041420f8bd2232f69310a5c52952291
client/client.go
@@ -3,7 +3,6 @@
import (
   "basic.com/valib/go-aiot.git/aiotProto/aiot"
   "basic.com/valib/go-aiot.git/util"
   "basic.com/valib/logger.git"
   "bufio"
   "encoding/binary"
   "encoding/json"
@@ -39,6 +38,8 @@
   StateDisconnected
)
var syncReq map[string]chan *aiot.Protocol
// 连接状态
type State int32
@@ -47,8 +48,8 @@
   net.Conn
   // 注册包
   deviceRegister *aiot.DeviceRegister
   // 关闭锁
   closeLock *sync.Mutex
   // 客户端锁
   lock *sync.Mutex
   // 消息锁
   msgLock *sync.Mutex
   // 连接地址
@@ -73,15 +74,16 @@
   clientCallback ClientCallBack
   // 心跳包
   heartBeatProto *aiot.HeartBeatProto
   // logger
   Logger *zap.SugaredLogger
}
// 初始化客户端
func NewClient(addr string, clientId string, deviceRegister *aiot.DeviceRegister, callBack ClientCallBack) *Client {
func NewClient(addr string, clientId string, deviceRegister *aiot.DeviceRegister, callBack ClientCallBack, logger *zap.SugaredLogger) *Client {
   logger.Debug("New Client...")
   return &Client{
      deviceRegister: deviceRegister,
      closeLock: new(sync.Mutex),
      msgLock: new(sync.Mutex),
      lock: new(sync.Mutex),
      addr: addr,
      deviceId: clientId,
      writeChan: make(chan []byte),
@@ -91,6 +93,7 @@
      waitGroup: &util.WaitGroupWrapper{},
      clientCallback: callBack,
      heartBeatProto: &aiot.HeartBeatProto{},
      Logger: logger,
   }
}
@@ -103,8 +106,7 @@
func (c *Client) InitClient() {
   // 初始化当前属性值
   c.Conn = nil
   c.closeLock = new(sync.Mutex)
   c.msgLock = new(sync.Mutex)
   c.lock = new(sync.Mutex)
   c.writeChan = make(chan []byte)
   c.exitChan = make(chan int8)
   c.state = StateInit
@@ -115,35 +117,35 @@
// 启动服务
func (c *Client) StartSrv() {
   // 判断连接状态,避免重复连接
   logger.Debug("Start client service...")
   c.Logger.Debug("Start client service...")
   if c.IsConnected(){
      logger.Error("net is connected, please do not repeat connect", zap.String("addr", c.addr))
      c.Logger.Error("net is connected, please do not repeat connect", zap.String("addr", c.addr))
      return
   }
   // 地址是否可用
   if c.addr == "" {
      logger.Warn("net addr is nil, wait for retry", zap.String("deviceId", c.deviceId))
      c.Logger.Warn("net addr is nil, wait for retry", zap.String("deviceId", c.deviceId))
      return
   }
   // 连接TCP
   logger.Debug("Connecting to service", zap.String("addr", c.addr))
   c.Logger.Debug("Connecting to service", zap.String("addr", c.addr))
   tcpAddr, err := net.ResolveTCPAddr("tcp", c.addr)
   if err != nil {
      logger.Warn("Net addr can not be connect for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
      c.Logger.Warn("Net addr can not be connect for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
      return
   }
   conn, err := net.DialTCP("tcp", nil, tcpAddr)
   if err != nil {
      logger.Warn("Net addr can not be dial for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
      c.Logger.Warn("Net addr can not be dial for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
      return
   }
   c.Conn = conn
   // 设置连接状态
   c.SetState(StateConnected)
   logger.Debug("Client service connected.", zap.String("addr", c.addr))
   c.Logger.Debug("Client service connected.", zap.String("addr", c.addr))
   // 启用读取通道
   c.SetRWBuf()
@@ -154,7 +156,7 @@
   // 启用心跳
   c.waitGroup.Wrap(c.writeHeartBeat)
   c.Wait()
   logger.Warn("Client service disconnected.Return...", zap.String("addr", c.addr))
   c.Logger.Warn("Client service disconnected.Return...", zap.String("addr", c.addr))
}
// 设置连接状态
@@ -188,23 +190,26 @@
   for {
      select {
      case <- c.exitChan:
         logger.Debug("Close client", zap.String("deviceId", c.deviceId))
         c.Logger.Debug("Close client", zap.String("deviceId", c.deviceId))
         c.Close()
         logger.Warn("writeLoop Done...")
         c.Logger.Warn("writeLoop Done...")
         return
      case bodyByte := <- c.writeChan:
         binary.BigEndian.PutUint32(byte4, uint32(len(bodyByte)))
         body = append(byte4, bodyByte...)
         c.lock.Lock()
         _,err = c.Conn.Write(body)
         c.lock.Unlock()
         if err != nil {
            logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte)))
            c.Logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte)))
            c.Close()
            logger.Warn("writeLoop Done...")
            c.Logger.Warn("writeLoop Done...")
            return
         }
         err = c.Writer.Flush()
         c.Logger.Debug("Write msg success...", zap.String("msg", string(bodyByte)))
         if err != nil {
            logger.Error("Fail to write flush", zap.Error(err))
            c.Logger.Error("Fail to write flush", zap.Error(err))
         }
      }
   }
@@ -213,14 +218,15 @@
// 发送消息
func (c *Client) WriteMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) {
   // 锁
   c.closeLock.Lock()
   defer c.closeLock.Unlock()
   c.lock.Lock()
   // 关闭的连接不能写入
   if c.IsClosed() {
      logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data)))
      c.Logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data)))
      c.lock.Unlock()
      return nil,nil
   }
   c.lock.Unlock()
   // 拼装并发送消息
   body := &aiot.Protocol{
@@ -237,33 +243,36 @@
}
func (c *Client) WriteBody(body *aiot.Protocol) error {
   defer func() {
      if err := recover();err != nil  {
         c.Logger.Error("Write Body Error:", err)
      }
   }()
   body.Receiver = aiot.RECEIVER_TO_SAAS
   logger.Debug("Send msg...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
   msgData, err := json.Marshal(body)
   if err != nil {
      logger.Error("Fail to Marshal send data", zap.Error(err))
      c.Logger.Error("Fail to Marshal send data", zap.Error(err))
      return err
   }
   c.msgLock.Lock()
   defer c.msgLock.Unlock()
   c.Logger.Debug("Write Body into writeChan...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
   c.writeChan <- msgData
   return nil
}
// 发送注册包
func (c *Client) writeRegister() {
   logger.Debug("registering...")
   c.Logger.Debug("registering...")
   data := c.deviceRegister
   msgData, _ := json.Marshal(data)
   _, err := c.WriteMsg(c.deviceId, aiot.MSG_TYPE_REGISTER, aiot.REQ_TYPE_REQUEST, msgData, c.GetMsgProto(""))
   if err != nil {
      logger.Error("Fail to send device register", zap.Any("msg", msgData))
      c.Logger.Error("Fail to send device register", zap.Any("msg", msgData))
   }
}
// 发送心跳包
func (c *Client) writeHeartBeat() {
   logger.Debug("Start HeartBeating...",zap.String("addr", c.addr))
   c.Logger.Debug("Start HeartBeating...",zap.String("addr", c.addr))
   pingData, _ := json.Marshal(c.heartBeatProto)
   t := time.NewTicker(DefaultHeartbeatInterval)
   defer func() {
@@ -284,19 +293,46 @@
   }
}
// 发送业务包请求
func (c *Client) Request(receiver aiot.RECEIVER, senderId string, msgProto *aiot.MsgIdProto, data []byte) error {
   defer func() {
      if err := recover();err != nil  {
         c.Logger.Error("Write Body Error:", err)
      }
   }()
   body := &aiot.Protocol{}
   body.Receiver = receiver
   body.SenderId = senderId
   body.MsgProto = msgProto
   body.MsgType = aiot.MSG_TYPE_BUSINESS
   body.ReqType = aiot.REQ_TYPE_REQUEST
   body.Data = data
   c.Logger.Debug("Send msg...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
   msgData, err := json.Marshal(body)
   if err != nil {
      c.Logger.Error("Fail to Marshal send data", zap.Error(err))
      return err
   }
   c.writeChan <- msgData
   return nil
}
// 消息读取通道
func (c *Client) readLoop() {
   var err error
   var length uint32
   for {
      c.tmpByte4Slice = make([]byte, 4)
      c.SetDeadline(time.Now().Add(DefaultReaderTimeOut))
      // 读取长度
      c.lock.Lock()
      _, err = io.ReadFull(c.Reader, c.tmpByte4Slice)
      c.lock.Unlock()
      if err != nil {
         if err == io.EOF {
            c.Logger.Error("Fail to read request byte4", zap.Error(err))
            err = nil
         } else {
            logger.Error("Fail to read request", zap.Error(err))
            c.Close()
            return
         }
@@ -304,31 +340,43 @@
      }
      length = binary.BigEndian.Uint32(c.tmpByte4Slice)
      if length > DefaultBufferSize {
         logger.Error("Fail to read request data from io", zap.Uint32("length",length))
         c.Logger.Error("Fail to read request data from io", zap.Uint32("length",length))
      }
      // 读取body
      bodyByte := make([]byte, length)
      c.closeLock.Lock()
      c.lock.Lock()
      _, err = io.ReadFull(c.Reader, bodyByte)
      c.closeLock.Unlock()
      c.lock.Unlock()
      if err != nil {
         if err == io.EOF {
            c.Logger.Error("Fail to read request body", zap.Error(err))
            err = nil
         } else {
            c.Close()
            return
         }
         break
      }
      body := &aiot.Protocol{}
      err = json.Unmarshal(bodyByte, body)
      if err != nil {
         logger.Error("Fail to unmarshal message", zap.Error(err), zap.String("msg", string(bodyByte)))
         c.Logger.Error("Fail to unmarshal message", zap.Error(err), zap.String("msg", string(bodyByte)))
      }
      logger.Debug("Read msg...", zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
      c.Logger.Debug("Read msg...", zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
      // 处理回调
      c.onMessage(body)
   }
   logger.Warn("ReadLoop Done...")
   c.Logger.Warn("ReadLoop Done...")
   // 关闭连接
   c.exitChan <- 1
}
// 处理回调
func (c *Client) onMessage (body *aiot.Protocol) {
   // 未封装callback,只写日志
   if c.clientCallback == nil {
      logger.Warn("ClientCallBack interface is nil...", zap.String("addr", c.addr), zap.Any("body", body))
      c.Logger.Warn("ClientCallBack interface is nil...", zap.String("addr", c.addr), zap.Any("body", body))
      return
   }
@@ -409,9 +457,9 @@
// 关闭TCP
func (c *Client) Close() {
   logger.Debug("Closing connect", zap.String("addr", c.addr))
   c.closeLock.Lock()
   defer c.closeLock.Unlock()
   c.Logger.Debug("Closing connect", zap.String("addr", c.addr))
   c.lock.Lock()
   defer c.lock.Unlock()
   // 关闭通道
   if !c.IsClosed() {