saas-smartAi通信协议标准库
gongshangguo
2022-03-01 3831dcb5a7ade90b1d2620719c6cd2c66a6dc765

1个文件已修改
58 ■■■■■ 已修改文件
client/client.go 58 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
client/client.go
@@ -48,10 +48,12 @@
    net.Conn
    // 注册包
    deviceRegister *aiot.DeviceRegister
    // 客户端锁
    lock *sync.Mutex
    // 消息锁
    msgLock *sync.Mutex
    // 关闭锁
    closeLock *sync.Mutex
    // 读取锁
    readLock *sync.Mutex
    // 写入锁
    writeLock *sync.Mutex
    // 连接地址
    addr string
    // 设备ID
@@ -83,11 +85,12 @@
    logger.Debug("New Client...")
    return &Client{
        deviceRegister: deviceRegister,
        lock: new(sync.Mutex),
        readLock: new(sync.Mutex),
        closeLock: new(sync.Mutex),
        writeLock: new(sync.Mutex),
        addr: addr,
        deviceId: clientId,
        writeChan: make(chan []byte),
        exitChan: make(chan int8),
        state: StateInit,
        tmpByte4Slice: make([]byte, 4),
        waitGroup: &util.WaitGroupWrapper{},
@@ -106,7 +109,9 @@
func (c *Client) InitClient() {
    // 初始化当前属性值
    c.Conn = nil
    c.lock = new(sync.Mutex)
    c.closeLock = new(sync.Mutex)
    c.readLock = new(sync.Mutex)
    c.writeLock = new(sync.Mutex)
    c.writeChan = make(chan []byte)
    c.exitChan = make(chan int8)
    c.state = StateInit
@@ -189,17 +194,12 @@
    byte4 := make([]byte,4)
    for {
        select {
        case <- c.exitChan:
            c.Logger.Debug("Close client", zap.String("deviceId", c.deviceId))
            c.Close()
            c.Logger.Warn("writeLoop Done...")
            return
        case bodyByte := <- c.writeChan:
            binary.BigEndian.PutUint32(byte4, uint32(len(bodyByte)))
            body = append(byte4, bodyByte...)
            c.lock.Lock()
            c.writeLock.Lock()
            _,err = c.Conn.Write(body)
            c.lock.Unlock()
            c.writeLock.Unlock()
            if err != nil {
                c.Logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte)))
                c.Close()
@@ -217,16 +217,11 @@
// 发送消息
func (c *Client) WriteMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) {
    // 锁
    c.lock.Lock()
    // 关闭的连接不能写入
    if c.IsClosed() {
        c.Logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data)))
        c.lock.Unlock()
        return nil,nil
    }
    c.lock.Unlock()
    // 拼装并发送消息
    body := &aiot.Protocol{
@@ -323,11 +318,11 @@
    var length uint32
    for {
        c.tmpByte4Slice = make([]byte, 4)
        c.SetDeadline(time.Now().Add(DefaultReaderTimeOut))
        _ = c.SetDeadline(time.Now().Add(DefaultReaderTimeOut))
        // 读取长度
        c.lock.Lock()
        c.readLock.Lock()
        _, err = io.ReadFull(c.Reader, c.tmpByte4Slice)
        c.lock.Unlock()
        c.readLock.Unlock()
        if err != nil {
            if err == io.EOF {
                c.Logger.Error("Fail to read request byte4", zap.Error(err))
@@ -344,9 +339,9 @@
        }
        // 读取body
        bodyByte := make([]byte, length)
        c.lock.Lock()
        c.readLock.Lock()
        _, err = io.ReadFull(c.Reader, bodyByte)
        c.lock.Unlock()
        c.readLock.Unlock()
        if err != nil {
            if err == io.EOF {
                c.Logger.Error("Fail to read request body", zap.Error(err))
@@ -369,7 +364,7 @@
    c.Logger.Warn("ReadLoop Done...")
    // 关闭连接
    c.exitChan <- 1
    c.Close()
}
// 处理回调
@@ -384,12 +379,12 @@
    switch body.MsgType {
    // 心跳回复
    case aiot.MSG_TYPE_HEART_BEAT:
        c.clientCallback.OnHeartBeat(c,body)
        go c.clientCallback.OnHeartBeat(c, body)
        return
    // 注册回复
    case aiot.MSG_TYPE_REGISTER:
        c.clientCallback.OnRegister(c,body)
        go c.clientCallback.OnRegister(c, body)
        return
    // 设备控制
@@ -458,9 +453,8 @@
// 关闭TCP
func (c *Client) Close() {
    c.Logger.Debug("Closing connect", zap.String("addr", c.addr))
    c.lock.Lock()
    defer c.lock.Unlock()
    c.closeLock.Lock()
    defer c.closeLock.Unlock()
    // 关闭通道
    if !c.IsClosed() {
        c.Conn.Close()
@@ -470,9 +464,5 @@
        // 设置连接属性
        c.SetState(StateDisconnected)
        // 关闭管道
        close(c.exitChan)
        close(c.writeChan)
    }
}