| | |
| | | net.Conn |
| | | // 注册包 |
| | | deviceRegister *aiot.DeviceRegister |
| | | // 客户端锁 |
| | | lock *sync.Mutex |
| | | // 消息锁 |
| | | msgLock *sync.Mutex |
| | | // 关闭锁 |
| | | closeLock *sync.Mutex |
| | | // 读取锁 |
| | | readLock *sync.Mutex |
| | | // 写入锁 |
| | | writeLock *sync.Mutex |
| | | // 连接地址 |
| | | addr string |
| | | // 设备ID |
| | |
| | | logger.Debug("New Client...") |
| | | return &Client{ |
| | | deviceRegister: deviceRegister, |
| | | lock: new(sync.Mutex), |
| | | readLock: new(sync.Mutex), |
| | | closeLock: new(sync.Mutex), |
| | | writeLock: new(sync.Mutex), |
| | | addr: addr, |
| | | deviceId: clientId, |
| | | writeChan: make(chan []byte), |
| | | exitChan: make(chan int8), |
| | | state: StateInit, |
| | | tmpByte4Slice: make([]byte, 4), |
| | | waitGroup: &util.WaitGroupWrapper{}, |
| | |
| | | func (c *Client) InitClient() { |
| | | // 初始化当前属性值 |
| | | c.Conn = nil |
| | | c.lock = new(sync.Mutex) |
| | | c.closeLock = new(sync.Mutex) |
| | | c.readLock = new(sync.Mutex) |
| | | c.writeLock = new(sync.Mutex) |
| | | c.writeChan = make(chan []byte) |
| | | c.exitChan = make(chan int8) |
| | | c.state = StateInit |
| | |
| | | byte4 := make([]byte,4) |
| | | for { |
| | | select { |
| | | case <- c.exitChan: |
| | | c.Logger.Debug("Close client", zap.String("deviceId", c.deviceId)) |
| | | c.Close() |
| | | c.Logger.Warn("writeLoop Done...") |
| | | return |
| | | case bodyByte := <- c.writeChan: |
| | | binary.BigEndian.PutUint32(byte4, uint32(len(bodyByte))) |
| | | body = append(byte4, bodyByte...) |
| | | c.lock.Lock() |
| | | c.writeLock.Lock() |
| | | _,err = c.Conn.Write(body) |
| | | c.lock.Unlock() |
| | | c.writeLock.Unlock() |
| | | if err != nil { |
| | | c.Logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte))) |
| | | c.Close() |
| | |
| | | |
| | | // 发送消息 |
| | | func (c *Client) WriteMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) { |
| | | // 锁 |
| | | c.lock.Lock() |
| | | |
| | | // 关闭的连接不能写入 |
| | | if c.IsClosed() { |
| | | c.Logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data))) |
| | | c.lock.Unlock() |
| | | return nil,nil |
| | | } |
| | | c.lock.Unlock() |
| | | |
| | | // 拼装并发送消息 |
| | | body := &aiot.Protocol{ |
| | |
| | | var length uint32 |
| | | for { |
| | | c.tmpByte4Slice = make([]byte, 4) |
| | | c.SetDeadline(time.Now().Add(DefaultReaderTimeOut)) |
| | | _ = c.SetDeadline(time.Now().Add(DefaultReaderTimeOut)) |
| | | // 读取长度 |
| | | c.lock.Lock() |
| | | c.readLock.Lock() |
| | | _, err = io.ReadFull(c.Reader, c.tmpByte4Slice) |
| | | c.lock.Unlock() |
| | | c.readLock.Unlock() |
| | | if err != nil { |
| | | if err == io.EOF { |
| | | c.Logger.Error("Fail to read request byte4", zap.Error(err)) |
| | |
| | | } |
| | | // 读取body |
| | | bodyByte := make([]byte, length) |
| | | c.lock.Lock() |
| | | c.readLock.Lock() |
| | | _, err = io.ReadFull(c.Reader, bodyByte) |
| | | c.lock.Unlock() |
| | | c.readLock.Unlock() |
| | | if err != nil { |
| | | if err == io.EOF { |
| | | c.Logger.Error("Fail to read request body", zap.Error(err)) |
| | |
| | | |
| | | c.Logger.Warn("ReadLoop Done...") |
| | | // 关闭连接 |
| | | c.exitChan <- 1 |
| | | c.Close() |
| | | } |
| | | |
| | | // 处理回调 |
| | |
| | | switch body.MsgType { |
| | | // 心跳回复 |
| | | case aiot.MSG_TYPE_HEART_BEAT: |
| | | c.clientCallback.OnHeartBeat(c,body) |
| | | go c.clientCallback.OnHeartBeat(c, body) |
| | | return |
| | | |
| | | // 注册回复 |
| | | case aiot.MSG_TYPE_REGISTER: |
| | | c.clientCallback.OnRegister(c,body) |
| | | go c.clientCallback.OnRegister(c, body) |
| | | return |
| | | |
| | | // 设备控制 |
| | |
| | | // 关闭TCP |
| | | func (c *Client) Close() { |
| | | c.Logger.Debug("Closing connect", zap.String("addr", c.addr)) |
| | | c.lock.Lock() |
| | | defer c.lock.Unlock() |
| | | |
| | | c.closeLock.Lock() |
| | | defer c.closeLock.Unlock() |
| | | // 关闭通道 |
| | | if !c.IsClosed() { |
| | | c.Conn.Close() |
| | |
| | | |
| | | // 设置连接属性 |
| | | c.SetState(StateDisconnected) |
| | | |
| | | // 关闭管道 |
| | | close(c.exitChan) |
| | | close(c.writeChan) |
| | | } |
| | | } |