| | |
| | | import ( |
| | | "basic.com/valib/go-aiot.git/aiotProto/aiot" |
| | | "basic.com/valib/go-aiot.git/util" |
| | | "basic.com/valib/logger.git" |
| | | "bufio" |
| | | "encoding/binary" |
| | | "encoding/json" |
| | |
| | | clientCallback ClientCallBack |
| | | // 心跳包 |
| | | heartBeatProto *aiot.HeartBeatProto |
| | | // logger |
| | | logger *zap.SugaredLogger |
| | | } |
| | | |
| | | // 初始化客户端 |
| | | func NewClient(addr string, clientId string, deviceRegister *aiot.DeviceRegister, callBack ClientCallBack) *Client { |
| | | func NewClient(addr string, clientId string, deviceRegister *aiot.DeviceRegister, callBack ClientCallBack, logger *zap.SugaredLogger) *Client { |
| | | logger.Debug("New Client...") |
| | | return &Client{ |
| | | deviceRegister: deviceRegister, |
| | |
| | | waitGroup: &util.WaitGroupWrapper{}, |
| | | clientCallback: callBack, |
| | | heartBeatProto: &aiot.HeartBeatProto{}, |
| | | logger: logger, |
| | | } |
| | | } |
| | | |
| | |
| | | // 启动服务 |
| | | func (c *Client) StartSrv() { |
| | | // 判断连接状态,避免重复连接 |
| | | logger.Debug("Start client service...") |
| | | c.logger.Debug("Start client service...") |
| | | if c.IsConnected(){ |
| | | logger.Error("net is connected, please do not repeat connect", zap.String("addr", c.addr)) |
| | | c.logger.Error("net is connected, please do not repeat connect", zap.String("addr", c.addr)) |
| | | return |
| | | } |
| | | |
| | | // 地址是否可用 |
| | | if c.addr == "" { |
| | | logger.Warn("net addr is nil, wait for retry", zap.String("deviceId", c.deviceId)) |
| | | c.logger.Warn("net addr is nil, wait for retry", zap.String("deviceId", c.deviceId)) |
| | | return |
| | | } |
| | | |
| | | // 连接TCP |
| | | logger.Debug("Connecting to service", zap.String("addr", c.addr)) |
| | | c.logger.Debug("Connecting to service", zap.String("addr", c.addr)) |
| | | tcpAddr, err := net.ResolveTCPAddr("tcp", c.addr) |
| | | if err != nil { |
| | | logger.Warn("Net addr can not be connect for now,waiting...", zap.String("addr", c.addr), zap.Error(err)) |
| | | c.logger.Warn("Net addr can not be connect for now,waiting...", zap.String("addr", c.addr), zap.Error(err)) |
| | | return |
| | | } |
| | | conn, err := net.DialTCP("tcp", nil, tcpAddr) |
| | | if err != nil { |
| | | logger.Warn("Net addr can not be dial for now,waiting...", zap.String("addr", c.addr), zap.Error(err)) |
| | | c.logger.Warn("Net addr can not be dial for now,waiting...", zap.String("addr", c.addr), zap.Error(err)) |
| | | return |
| | | } |
| | | c.Conn = conn |
| | | |
| | | // 设置连接状态 |
| | | c.SetState(StateConnected) |
| | | logger.Debug("Client service connected.", zap.String("addr", c.addr)) |
| | | c.logger.Debug("Client service connected.", zap.String("addr", c.addr)) |
| | | |
| | | // 启用读取通道 |
| | | c.SetRWBuf() |
| | |
| | | // 启用心跳 |
| | | c.waitGroup.Wrap(c.writeHeartBeat) |
| | | c.Wait() |
| | | logger.Warn("Client service disconnected.Return...", zap.String("addr", c.addr)) |
| | | c.logger.Warn("Client service disconnected.Return...", zap.String("addr", c.addr)) |
| | | } |
| | | |
| | | // 设置连接状态 |
| | |
| | | for { |
| | | select { |
| | | case <- c.exitChan: |
| | | logger.Debug("Close client", zap.String("deviceId", c.deviceId)) |
| | | c.logger.Debug("Close client", zap.String("deviceId", c.deviceId)) |
| | | c.Close() |
| | | logger.Warn("writeLoop Done...") |
| | | c.logger.Warn("writeLoop Done...") |
| | | return |
| | | case bodyByte := <- c.writeChan: |
| | | binary.BigEndian.PutUint32(byte4, uint32(len(bodyByte))) |
| | | body = append(byte4, bodyByte...) |
| | | _,err = c.Conn.Write(body) |
| | | if err != nil { |
| | | logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte))) |
| | | c.logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte))) |
| | | c.Close() |
| | | logger.Warn("writeLoop Done...") |
| | | c.logger.Warn("writeLoop Done...") |
| | | return |
| | | } |
| | | err = c.Writer.Flush() |
| | | if err != nil { |
| | | logger.Error("Fail to write flush", zap.Error(err)) |
| | | c.logger.Error("Fail to write flush", zap.Error(err)) |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | // 关闭的连接不能写入 |
| | | if c.IsClosed() { |
| | | logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data))) |
| | | c.logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data))) |
| | | return nil,nil |
| | | } |
| | | |
| | |
| | | |
| | | func (c *Client) WriteBody(body *aiot.Protocol) error { |
| | | body.Receiver = aiot.RECEIVER_TO_SAAS |
| | | logger.Debug("Send msg...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType)) |
| | | c.logger.Debug("Send msg...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType)) |
| | | msgData, err := json.Marshal(body) |
| | | if err != nil { |
| | | logger.Error("Fail to Marshal send data", zap.Error(err)) |
| | | c.logger.Error("Fail to Marshal send data", zap.Error(err)) |
| | | return err |
| | | } |
| | | c.msgLock.Lock() |
| | |
| | | |
| | | // 发送注册包 |
| | | func (c *Client) writeRegister() { |
| | | logger.Debug("registering...") |
| | | c.logger.Debug("registering...") |
| | | data := c.deviceRegister |
| | | msgData, _ := json.Marshal(data) |
| | | _, err := c.WriteMsg(c.deviceId, aiot.MSG_TYPE_REGISTER, aiot.REQ_TYPE_REQUEST, msgData, c.GetMsgProto("")) |
| | | if err != nil { |
| | | logger.Error("Fail to send device register", zap.Any("msg", msgData)) |
| | | c.logger.Error("Fail to send device register", zap.Any("msg", msgData)) |
| | | } |
| | | } |
| | | |
| | | // 发送心跳包 |
| | | func (c *Client) writeHeartBeat() { |
| | | logger.Debug("Start HeartBeating...",zap.String("addr", c.addr)) |
| | | c.logger.Debug("Start HeartBeating...",zap.String("addr", c.addr)) |
| | | pingData, _ := json.Marshal(c.heartBeatProto) |
| | | t := time.NewTicker(DefaultHeartbeatInterval) |
| | | defer func() { |
| | |
| | | if err == io.EOF { |
| | | err = nil |
| | | } else { |
| | | logger.Error("Fail to read request", zap.Error(err)) |
| | | c.logger.Error("Fail to read request", zap.Error(err)) |
| | | c.Close() |
| | | return |
| | | } |
| | |
| | | } |
| | | length = binary.BigEndian.Uint32(c.tmpByte4Slice) |
| | | if length > DefaultBufferSize { |
| | | logger.Error("Fail to read request data from io", zap.Uint32("length",length)) |
| | | c.logger.Error("Fail to read request data from io", zap.Uint32("length",length)) |
| | | } |
| | | // 读取body |
| | | bodyByte := make([]byte, length) |
| | |
| | | body := &aiot.Protocol{} |
| | | err = json.Unmarshal(bodyByte, body) |
| | | if err != nil { |
| | | logger.Error("Fail to unmarshal message", zap.Error(err), zap.String("msg", string(bodyByte))) |
| | | c.logger.Error("Fail to unmarshal message", zap.Error(err), zap.String("msg", string(bodyByte))) |
| | | } |
| | | logger.Debug("Read msg...", zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType)) |
| | | c.logger.Debug("Read msg...", zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType)) |
| | | // 处理回调 |
| | | c.onMessage(body) |
| | | } |
| | | |
| | | logger.Warn("ReadLoop Done...") |
| | | c.logger.Warn("ReadLoop Done...") |
| | | } |
| | | |
| | | // 处理回调 |
| | | func (c *Client) onMessage (body *aiot.Protocol) { |
| | | // 未封装callback,只写日志 |
| | | if c.clientCallback == nil { |
| | | logger.Warn("ClientCallBack interface is nil...", zap.String("addr", c.addr), zap.Any("body", body)) |
| | | c.logger.Warn("ClientCallBack interface is nil...", zap.String("addr", c.addr), zap.Any("body", body)) |
| | | return |
| | | } |
| | | |
| | |
| | | |
| | | // 关闭TCP |
| | | func (c *Client) Close() { |
| | | logger.Debug("Closing connect", zap.String("addr", c.addr)) |
| | | c.logger.Debug("Closing connect", zap.String("addr", c.addr)) |
| | | c.closeLock.Lock() |
| | | defer c.closeLock.Unlock() |
| | | |