saas-smartAi通信协议标准库
gongshangguo
2022-03-01 45def8688e8caee3707340ac1b52000950df769e
写入消息锁
1个文件已修改
34 ■■■■■ 已修改文件
client/client.go 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
client/client.go
@@ -197,7 +197,11 @@
        case bodyByte := <- c.writeChan:
            binary.BigEndian.PutUint32(byte4, uint32(len(bodyByte)))
            body = append(byte4, bodyByte...)
            c.closeLock.Lock()
            c.msgLock.Lock()
            _,err = c.Conn.Write(body)
            c.closeLock.Unlock()
            c.msgLock.Unlock()
            if err != nil {
                c.Logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte)))
                c.Close()
@@ -216,13 +220,14 @@
func (c *Client) WriteMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) {
    // 锁
    c.closeLock.Lock()
    defer c.closeLock.Unlock()
    // 关闭的连接不能写入
    if c.IsClosed() {
        c.Logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data)))
        c.closeLock.Unlock()
        return nil,nil
    }
    c.closeLock.Unlock()
    // 拼装并发送消息
    body := &aiot.Protocol{
@@ -247,8 +252,10 @@
        return err
    }
    c.msgLock.Lock()
    defer c.msgLock.Unlock()
    c.closeLock.Lock()
    c.writeChan <- msgData
    c.closeLock.Unlock()
    c.msgLock.Unlock()
    return nil
}
@@ -286,6 +293,29 @@
    }
}
// 发送业务包请求
func (c *Client) Request(receiver aiot.RECEIVER, senderId string, msgProto *aiot.MsgIdProto, data []byte) error {
    body := &aiot.Protocol{}
    body.Receiver = receiver
    body.SenderId = senderId
    body.MsgProto = msgProto
    body.MsgType = aiot.MSG_TYPE_BUSINESS
    body.ReqType = aiot.REQ_TYPE_REQUEST
    body.Data = data
    c.Logger.Debug("Send msg...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
    msgData, err := json.Marshal(body)
    if err != nil {
        c.Logger.Error("Fail to Marshal send data", zap.Error(err))
        return err
    }
    c.msgLock.Lock()
    c.closeLock.Lock()
    c.writeChan <- msgData
    c.closeLock.Unlock()
    c.msgLock.Unlock()
    return nil
}
// 消息读取通道
func (c *Client) readLoop() {
    var err error