| | |
| | | "bufio" |
| | | "encoding/binary" |
| | | "encoding/json" |
| | | "errors" |
| | | uuid "github.com/satori/go.uuid" |
| | | "go.uber.org/zap" |
| | | "io" |
| | |
| | | DefaultHeartbeatInterval = 15 * time.Second |
| | | // 读取数据超时时间 |
| | | DefaultReaderTimeOut = 60 * time.Second |
| | | // 连接尝试间隔 |
| | | DefaultNetRetry = 10 * time.Second |
| | | ) |
| | | |
| | | // 连接状态 |
| | |
| | | deviceRegister *aiot.DeviceRegister |
| | | // 关闭锁 |
| | | closeLock *sync.Mutex |
| | | // 消息锁 |
| | | msgLock *sync.Mutex |
| | | // 读取锁 |
| | | readLock *sync.Mutex |
| | | // 写入锁 |
| | | writeLock *sync.Mutex |
| | | // 连接地址 |
| | | addr string |
| | | // 设备ID |
| | |
| | | Writer *bufio.Writer |
| | | // 写入通道 |
| | | writeChan chan []byte |
| | | // 退出通道 |
| | | exitChan chan int8 |
| | | // 连接状态 |
| | | state State |
| | | // 报文头 |
| | |
| | | logger.Debug("New Client...") |
| | | return &Client{ |
| | | deviceRegister: deviceRegister, |
| | | readLock: new(sync.Mutex), |
| | | closeLock: new(sync.Mutex), |
| | | msgLock: new(sync.Mutex), |
| | | writeLock: new(sync.Mutex), |
| | | addr: addr, |
| | | deviceId: clientId, |
| | | writeChan: make(chan []byte), |
| | | exitChan: make(chan int8), |
| | | state: StateInit, |
| | | tmpByte4Slice: make([]byte, 4), |
| | | waitGroup: &util.WaitGroupWrapper{}, |
| | |
| | | // 初始化当前属性值 |
| | | c.Conn = nil |
| | | c.closeLock = new(sync.Mutex) |
| | | c.msgLock = new(sync.Mutex) |
| | | c.readLock = new(sync.Mutex) |
| | | c.writeLock = new(sync.Mutex) |
| | | c.writeChan = make(chan []byte) |
| | | c.exitChan = make(chan int8) |
| | | c.state = StateInit |
| | | c.tmpByte4Slice = make([]byte, 4) |
| | | c.waitGroup = &util.WaitGroupWrapper{} |
| | |
| | | byte4 := make([]byte,4) |
| | | for { |
| | | select { |
| | | case <- c.exitChan: |
| | | c.Logger.Debug("Close client", zap.String("deviceId", c.deviceId)) |
| | | c.Close() |
| | | c.Logger.Warn("writeLoop Done...") |
| | | return |
| | | case bodyByte := <- c.writeChan: |
| | | binary.BigEndian.PutUint32(byte4, uint32(len(bodyByte))) |
| | | body = append(byte4, bodyByte...) |
| | | c.closeLock.Lock() |
| | | c.msgLock.Lock() |
| | | c.writeLock.Lock() |
| | | _,err = c.Conn.Write(body) |
| | | c.closeLock.Unlock() |
| | | c.msgLock.Unlock() |
| | | c.writeLock.Unlock() |
| | | if err != nil { |
| | | c.Logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte))) |
| | | c.Close() |
| | |
| | | } |
| | | |
| | | // 发送消息 |
| | | func (c *Client) WriteMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) { |
| | | // 锁 |
| | | c.closeLock.Lock() |
| | | |
| | | func (c *Client) writeMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) { |
| | | // 关闭的连接不能写入 |
| | | if c.IsClosed() { |
| | | c.Logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data))) |
| | | c.closeLock.Unlock() |
| | | return nil,nil |
| | | } |
| | | c.closeLock.Unlock() |
| | | |
| | | // 拼装并发送消息 |
| | | body := &aiot.Protocol{ |
| | | Receiver: aiot.RECEIVER_TO_SAAS, |
| | | SenderId: senderId, |
| | | MsgType: msgType, |
| | | ReqType: reqType, |
| | |
| | | } |
| | | |
| | | // 发送消息 |
| | | c.WriteBody(body) |
| | | _ = c.WriteBody(body) |
| | | return body, nil |
| | | } |
| | | |
| | | func (c *Client) WriteBody(body *aiot.Protocol) error { |
| | | body.Receiver = aiot.RECEIVER_TO_SAAS |
| | | c.Logger.Debug("Write Body...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType)) |
| | | defer func() { |
| | | if err := recover();err != nil { |
| | | c.Logger.Error("Write Body Error:", err) |
| | | } |
| | | }() |
| | | |
| | | // 通道已关闭,不能写入 |
| | | if c.IsClosed() { |
| | | errMsg := "Can not write msg into closed chain" |
| | | c.Logger.Warn(errMsg, zap.Any("msg",body)) |
| | | return errors.New(errMsg) |
| | | } |
| | | |
| | | // 消息ID默认处理 |
| | | if body.MsgProto == nil { |
| | | body.MsgProto = GetMsgProto("") |
| | | } |
| | | msgData, err := json.Marshal(body) |
| | | if err != nil { |
| | | c.Logger.Error("Fail to Marshal send data", zap.Error(err)) |
| | | return err |
| | | } |
| | | c.msgLock.Lock() |
| | | c.closeLock.Lock() |
| | | c.Logger.Debug("Write Body into writeChan...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType)) |
| | | c.writeChan <- msgData |
| | | c.closeLock.Unlock() |
| | | c.msgLock.Unlock() |
| | | return nil |
| | | } |
| | | |
| | |
| | | c.Logger.Debug("registering...") |
| | | data := c.deviceRegister |
| | | msgData, _ := json.Marshal(data) |
| | | _, err := c.WriteMsg(c.deviceId, aiot.MSG_TYPE_REGISTER, aiot.REQ_TYPE_REQUEST, msgData, c.GetMsgProto("")) |
| | | _, err := c.writeMsg(c.deviceId, aiot.MSG_TYPE_REGISTER, aiot.REQ_TYPE_REQUEST, msgData, GetMsgProto("")) |
| | | if err != nil { |
| | | c.Logger.Error("Fail to send device register", zap.Any("msg", msgData)) |
| | | } |
| | |
| | | t.Stop() |
| | | return |
| | | } |
| | | go c.WriteMsg(c.deviceId, aiot.MSG_TYPE_HEART_BEAT, aiot.REQ_TYPE_REQUEST, pingData, c.GetMsgProto("")) |
| | | go c.writeMsg(c.deviceId, aiot.MSG_TYPE_HEART_BEAT, aiot.REQ_TYPE_REQUEST, pingData, GetMsgProto("")) |
| | | } |
| | | } |
| | | } |
| | | |
| | | // 发送业务包请求 |
| | | func (c *Client) Request(receiver aiot.RECEIVER, senderId string, msgProto *aiot.MsgIdProto, data []byte) error { |
| | | defer func() { |
| | | if err := recover();err != nil { |
| | | c.Logger.Error("Write Body Error:", err) |
| | | } |
| | | }() |
| | | body := &aiot.Protocol{} |
| | | body.Receiver = receiver |
| | | body.SenderId = senderId |
| | |
| | | c.Logger.Error("Fail to Marshal send data", zap.Error(err)) |
| | | return err |
| | | } |
| | | c.msgLock.Lock() |
| | | c.closeLock.Lock() |
| | | c.writeChan <- msgData |
| | | c.closeLock.Unlock() |
| | | c.msgLock.Unlock() |
| | | return nil |
| | | } |
| | | |
| | |
| | | var err error |
| | | var length uint32 |
| | | for { |
| | | c.SetDeadline(time.Now().Add(DefaultReaderTimeOut)) |
| | | c.tmpByte4Slice = make([]byte, 4) |
| | | _ = c.SetDeadline(time.Now().Add(DefaultReaderTimeOut)) |
| | | // 读取长度 |
| | | c.readLock.Lock() |
| | | _, err = io.ReadFull(c.Reader, c.tmpByte4Slice) |
| | | c.readLock.Unlock() |
| | | if err != nil { |
| | | if err == io.EOF { |
| | | c.Logger.Error("Fail to read request byte4", zap.Error(err)) |
| | | err = nil |
| | | } else { |
| | | c.Logger.Error("Fail to read request", zap.Error(err)) |
| | | c.Close() |
| | | return |
| | | } |
| | |
| | | } |
| | | // 读取body |
| | | bodyByte := make([]byte, length) |
| | | c.closeLock.Lock() |
| | | c.readLock.Lock() |
| | | _, err = io.ReadFull(c.Reader, bodyByte) |
| | | c.closeLock.Unlock() |
| | | c.readLock.Unlock() |
| | | if err != nil { |
| | | if err == io.EOF { |
| | | c.Logger.Error("Fail to read request body", zap.Error(err)) |
| | | err = nil |
| | | } else { |
| | | c.Close() |
| | | return |
| | | } |
| | | break |
| | | } |
| | | body := &aiot.Protocol{} |
| | | err = json.Unmarshal(bodyByte, body) |
| | | if err != nil { |
| | |
| | | |
| | | c.Logger.Warn("ReadLoop Done...") |
| | | // 关闭连接 |
| | | c.exitChan <- 1 |
| | | c.Close() |
| | | } |
| | | |
| | | // 处理回调 |
| | |
| | | switch body.MsgType { |
| | | // 心跳回复 |
| | | case aiot.MSG_TYPE_HEART_BEAT: |
| | | c.clientCallback.OnHeartBeat(c,body) |
| | | go c.clientCallback.OnHeartBeat(c, body) |
| | | return |
| | | |
| | | // 注册回复 |
| | | case aiot.MSG_TYPE_REGISTER: |
| | | c.clientCallback.OnRegister(c,body) |
| | | go c.clientCallback.OnRegister(c, body) |
| | | return |
| | | |
| | | // 设备控制 |
| | |
| | | } |
| | | |
| | | // 拼装消息ID |
| | | func (c *Client) GetMsgProto(msgId string) *aiot.MsgIdProto { |
| | | func GetMsgProto(msgId string) *aiot.MsgIdProto { |
| | | // 新消息 |
| | | if msgId == "" { |
| | | return &aiot.MsgIdProto{ |
| | |
| | | return c.deviceId |
| | | } |
| | | |
| | | // 获取连接状态 |
| | | func (c *Client) GetState() State { |
| | | return c.state |
| | | } |
| | | |
| | | // 判断连接是否关闭 |
| | | func (c *Client) IsClosed() bool { |
| | | return c.state == StateDisconnected |
| | |
| | | |
| | | // 关闭TCP |
| | | func (c *Client) Close() { |
| | | c.Logger.Debug("Closing connect", zap.String("addr", c.addr)) |
| | | c.Logger.Debug("Closing connect...", zap.String("addr", c.addr)) |
| | | c.closeLock.Lock() |
| | | defer c.closeLock.Unlock() |
| | | |
| | | // 关闭通道 |
| | | if !c.IsClosed() { |
| | | c.Conn.Close() |
| | | _ = c.Conn.Close() |
| | | if c.IsConnected() { |
| | | c.clientCallback.OnClose(c) |
| | | } |
| | | |
| | | // 设置连接属性 |
| | | c.SetState(StateDisconnected) |
| | | |
| | | // 关闭管道 |
| | | close(c.exitChan) |
| | | close(c.writeChan) |
| | | } |
| | | |
| | | // 设置连接属性 |
| | | c.SetState(StateDisconnected) |
| | | c.Logger.Debug("Connect closed...", zap.String("addr", c.addr)) |
| | | } |