package client import ( "basic.com/valib/go-aiot.git/aiotProto/aiot" "basic.com/valib/go-aiot.git/util" "bufio" "encoding/binary" "encoding/json" "errors" uuid "github.com/satori/go.uuid" "go.uber.org/zap" "io" "net" "sync" "time" ) // 消息及时长 const ( // 最大消息长度 DefaultBufferSize = 512 * 1024 // 心跳间隔 DefaultHeartbeatInterval = 15 * time.Second // 读取数据超时时间 DefaultReaderTimeOut = 60 * time.Second ) // 连接状态 const ( // 初始化 StateInit State = iota // 已连接 StateConnected // 已注册 StateRegistered // 已断开 StateDisconnected ) // 连接状态 type State int32 // 客户端连接结构体 type Client struct { net.Conn // 注册包 deviceRegister *aiot.DeviceRegister // 关闭锁 closeLock *sync.Mutex // 读取锁 readLock *sync.Mutex // 写入锁 writeLock *sync.Mutex // 连接地址 addr string // 设备ID deviceId string // 消息读取buffer Reader *bufio.Reader // 消息写入buffer Writer *bufio.Writer // 写入通道 writeChan chan []byte // 连接状态 state State // 报文头 tmpByte4Slice []byte // waiter waitGroup *util.WaitGroupWrapper // 回调函数 clientCallback ClientCallBack // 心跳包 heartBeatProto *aiot.HeartBeatProto // logger Logger *zap.SugaredLogger } // 初始化客户端 func NewClient(addr string, clientId string, deviceRegister *aiot.DeviceRegister, callBack ClientCallBack, logger *zap.SugaredLogger) *Client { logger.Debug("New Client...") return &Client{ deviceRegister: deviceRegister, readLock: new(sync.Mutex), closeLock: new(sync.Mutex), writeLock: new(sync.Mutex), addr: addr, deviceId: clientId, writeChan: make(chan []byte), state: StateInit, tmpByte4Slice: make([]byte, 4), waitGroup: &util.WaitGroupWrapper{}, clientCallback: callBack, heartBeatProto: &aiot.HeartBeatProto{}, Logger: logger, } } // 设置回掉接口 func (c *Client) SetCallBack(callBack ClientCallBack) { c.clientCallback = callBack } // 初始化连接属性 func (c *Client) InitClient() { // 初始化当前属性值 c.Conn = nil c.closeLock = new(sync.Mutex) c.readLock = new(sync.Mutex) c.writeLock = new(sync.Mutex) c.writeChan = make(chan []byte) c.state = StateInit c.tmpByte4Slice = make([]byte, 4) c.waitGroup = &util.WaitGroupWrapper{} } // 启动服务 func (c *Client) StartSrv() { // 判断连接状态,避免重复连接 c.Logger.Debug("Start client service...") if c.IsConnected(){ c.Logger.Error("net is connected, please do not repeat connect", zap.String("addr", c.addr)) return } // 地址是否可用 if c.addr == "" { c.Logger.Warn("net addr is nil, wait for retry", zap.String("deviceId", c.deviceId)) return } // 连接TCP c.Logger.Debug("Connecting to service", zap.String("addr", c.addr)) tcpAddr, err := net.ResolveTCPAddr("tcp", c.addr) if err != nil { c.Logger.Warn("Net addr can not be connect for now,waiting...", zap.String("addr", c.addr), zap.Error(err)) return } conn, err := net.DialTCP("tcp", nil, tcpAddr) if err != nil { c.Logger.Warn("Net addr can not be dial for now,waiting...", zap.String("addr", c.addr), zap.Error(err)) return } c.Conn = conn // 设置连接状态 c.SetState(StateConnected) c.Logger.Debug("Client service connected.", zap.String("addr", c.addr)) // 启用读取通道 c.SetRWBuf() // 发送注册信息 c.writeRegister() // 启用心跳 c.waitGroup.Wrap(c.writeHeartBeat) c.Wait() c.Logger.Warn("Client service disconnected.Return...", zap.String("addr", c.addr)) } // 设置连接状态 func (c *Client) SetState(state State) { c.state = state } // 指定节点ID func (c *Client) SetDeviceId(deviceId string) { c.deviceId = deviceId } // 启用读写通道 func (c *Client) SetRWBuf() { // 设置数据发送和接收上限 c.Writer = bufio.NewWriterSize(c.Conn, DefaultBufferSize) c.Reader = bufio.NewReaderSize(c.Conn, DefaultBufferSize) // 启用读取通道 c.waitGroup.Wrap(c.readLoop) // 启用写入通道 c.waitGroup.Wrap(c.writeLoop) } // 消息写入通道 func (c *Client) writeLoop() { var err error var body []byte byte4 := make([]byte,4) for { select { case bodyByte := <- c.writeChan: binary.BigEndian.PutUint32(byte4, uint32(len(bodyByte))) body = append(byte4, bodyByte...) c.writeLock.Lock() _,err = c.Conn.Write(body) c.writeLock.Unlock() if err != nil { c.Logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte))) c.Close() c.Logger.Warn("writeLoop Done...") return } err = c.Writer.Flush() c.Logger.Debug("Write msg success...", zap.String("msg", string(bodyByte))) if err != nil { c.Logger.Error("Fail to write flush", zap.Error(err)) } } } } // 发送消息 func (c *Client) WriteMsg(senderId string, msgType aiot.MSG_TYPE, reqType aiot.REQ_TYPE, data []byte, msgProto *aiot.MsgIdProto) (*aiot.Protocol, error) { // 关闭的连接不能写入 if c.IsClosed() { c.Logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data))) return nil,nil } // 拼装并发送消息 body := &aiot.Protocol{ SenderId: senderId, MsgType: msgType, ReqType: reqType, Data: data, MsgProto: msgProto, } // 发送消息 _ = c.WriteBody(body) return body, nil } func (c *Client) WriteBody(body *aiot.Protocol) error { defer func() { if err := recover();err != nil { c.Logger.Error("Write Body Error:", err) } }() if c.IsClosed() { errMsg := "Can not write msg into closed chain" c.Logger.Warn(errMsg, zap.Any("msg",body)) return errors.New(errMsg) } body.Receiver = aiot.RECEIVER_TO_SAAS msgData, err := json.Marshal(body) if err != nil { c.Logger.Error("Fail to Marshal send data", zap.Error(err)) return err } c.Logger.Debug("Write Body into writeChan...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType)) c.writeChan <- msgData return nil } // 发送注册包 func (c *Client) writeRegister() { c.Logger.Debug("registering...") data := c.deviceRegister msgData, _ := json.Marshal(data) _, err := c.WriteMsg(c.deviceId, aiot.MSG_TYPE_REGISTER, aiot.REQ_TYPE_REQUEST, msgData, GetMsgProto("")) if err != nil { c.Logger.Error("Fail to send device register", zap.Any("msg", msgData)) } } // 发送心跳包 func (c *Client) writeHeartBeat() { c.Logger.Debug("Start HeartBeating...",zap.String("addr", c.addr)) pingData, _ := json.Marshal(c.heartBeatProto) t := time.NewTicker(DefaultHeartbeatInterval) defer func() { t.Stop() c.Close() }() // 循环发送 for{ select { case <- t.C: if c.IsClosed() { t.Stop() return } go c.WriteMsg(c.deviceId, aiot.MSG_TYPE_HEART_BEAT, aiot.REQ_TYPE_REQUEST, pingData, GetMsgProto("")) } } } // 发送业务包请求 func (c *Client) Request(receiver aiot.RECEIVER, senderId string, msgProto *aiot.MsgIdProto, data []byte) error { defer func() { if err := recover();err != nil { c.Logger.Error("Write Body Error:", err) } }() body := &aiot.Protocol{} body.Receiver = receiver body.SenderId = senderId body.MsgProto = msgProto body.MsgType = aiot.MSG_TYPE_BUSINESS body.ReqType = aiot.REQ_TYPE_REQUEST body.Data = data c.Logger.Debug("Send msg...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType)) msgData, err := json.Marshal(body) if err != nil { c.Logger.Error("Fail to Marshal send data", zap.Error(err)) return err } c.writeChan <- msgData return nil } // 消息读取通道 func (c *Client) readLoop() { var err error var length uint32 for { c.tmpByte4Slice = make([]byte, 4) _ = c.SetDeadline(time.Now().Add(DefaultReaderTimeOut)) // 读取长度 c.readLock.Lock() _, err = io.ReadFull(c.Reader, c.tmpByte4Slice) c.readLock.Unlock() if err != nil { if err == io.EOF { c.Logger.Error("Fail to read request byte4", zap.Error(err)) err = nil } else { c.Close() return } break } length = binary.BigEndian.Uint32(c.tmpByte4Slice) if length > DefaultBufferSize { c.Logger.Error("Fail to read request data from io", zap.Uint32("length",length)) } // 读取body bodyByte := make([]byte, length) c.readLock.Lock() _, err = io.ReadFull(c.Reader, bodyByte) c.readLock.Unlock() if err != nil { if err == io.EOF { c.Logger.Error("Fail to read request body", zap.Error(err)) err = nil } else { c.Close() return } break } body := &aiot.Protocol{} err = json.Unmarshal(bodyByte, body) if err != nil { c.Logger.Error("Fail to unmarshal message", zap.Error(err), zap.String("msg", string(bodyByte))) } c.Logger.Debug("Read msg...", zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType)) // 处理回调 c.onMessage(body) } c.Logger.Warn("ReadLoop Done...") // 关闭连接 c.Close() } // 处理回调 func (c *Client) onMessage (body *aiot.Protocol) { // 未封装callback,只写日志 if c.clientCallback == nil { c.Logger.Warn("ClientCallBack interface is nil...", zap.String("addr", c.addr), zap.Any("body", body)) return } // 根据消息类型分类 switch body.MsgType { // 心跳回复 case aiot.MSG_TYPE_HEART_BEAT: go c.clientCallback.OnHeartBeat(c, body) return // 注册回复 case aiot.MSG_TYPE_REGISTER: go c.clientCallback.OnRegister(c, body) return // 设备控制 case aiot.MSG_TYPE_CONTROL: go c.clientCallback.OnDeviceControl(c,body) return // 数据下发 case aiot.MSG_TYPE_DATA_REPORT: go c.clientCallback.OnDataReport(c,body) return // 其他业务消息 case aiot.MSG_TYPE_BUSINESS: switch body.ReqType { // 被请求 case aiot.REQ_TYPE_REQUEST: go c.clientCallback.OnRequest(c,body) return // 被响应 case aiot.REQ_TYPE_RESPONSE: go c.clientCallback.OnResponse(c,body) return } return } } // 拼装消息ID func GetMsgProto(msgId string) *aiot.MsgIdProto { // 新消息 if msgId == "" { return &aiot.MsgIdProto{ MsgId: uuid.NewV4().String(), } } // 回复消息 return &aiot.MsgIdProto{ MsgId: uuid.NewV4().String(), PreMsgId: msgId, } } // 获取deviceId func (c *Client) GetDeviceId() string { return c.deviceId } // 获取连接状态 func (c *Client) GetState() State { return c.state } // 判断连接是否关闭 func (c *Client) IsClosed() bool { return c.state == StateDisconnected } // 连接是否在线 func (c *Client) IsConnected () bool { return c.state == StateConnected || c.state == StateRegistered } // 阻塞等待 func (c *Client) Wait() { c.waitGroup.Wait() } // 关闭TCP func (c *Client) Close() { c.Logger.Debug("Closing connect...", zap.String("addr", c.addr)) c.closeLock.Lock() defer c.closeLock.Unlock() // 关闭通道 if !c.IsClosed() { _ = c.Conn.Close() if c.IsConnected() { c.clientCallback.OnClose(c) } close(c.writeChan) } // 设置连接属性 c.SetState(StateDisconnected) c.Logger.Debug("Connect closed...", zap.String("addr", c.addr)) }