| | |
| | | // 心跳包 |
| | | heartBeatProto *aiot.HeartBeatProto |
| | | // logger |
| | | logger *zap.SugaredLogger |
| | | Logger *zap.SugaredLogger |
| | | } |
| | | |
| | | // 初始化客户端 |
| | |
| | | waitGroup: &util.WaitGroupWrapper{}, |
| | | clientCallback: callBack, |
| | | heartBeatProto: &aiot.HeartBeatProto{}, |
| | | logger: logger, |
| | | Logger: logger, |
| | | } |
| | | } |
| | | |
| | |
| | | // 启动服务 |
| | | func (c *Client) StartSrv() { |
| | | // 判断连接状态,避免重复连接 |
| | | c.logger.Debug("Start client service...") |
| | | c.Logger.Debug("Start client service...") |
| | | if c.IsConnected(){ |
| | | c.logger.Error("net is connected, please do not repeat connect", zap.String("addr", c.addr)) |
| | | c.Logger.Error("net is connected, please do not repeat connect", zap.String("addr", c.addr)) |
| | | return |
| | | } |
| | | |
| | | // 地址是否可用 |
| | | if c.addr == "" { |
| | | c.logger.Warn("net addr is nil, wait for retry", zap.String("deviceId", c.deviceId)) |
| | | c.Logger.Warn("net addr is nil, wait for retry", zap.String("deviceId", c.deviceId)) |
| | | return |
| | | } |
| | | |
| | | // 连接TCP |
| | | c.logger.Debug("Connecting to service", zap.String("addr", c.addr)) |
| | | c.Logger.Debug("Connecting to service", zap.String("addr", c.addr)) |
| | | tcpAddr, err := net.ResolveTCPAddr("tcp", c.addr) |
| | | if err != nil { |
| | | c.logger.Warn("Net addr can not be connect for now,waiting...", zap.String("addr", c.addr), zap.Error(err)) |
| | | c.Logger.Warn("Net addr can not be connect for now,waiting...", zap.String("addr", c.addr), zap.Error(err)) |
| | | return |
| | | } |
| | | conn, err := net.DialTCP("tcp", nil, tcpAddr) |
| | | if err != nil { |
| | | c.logger.Warn("Net addr can not be dial for now,waiting...", zap.String("addr", c.addr), zap.Error(err)) |
| | | c.Logger.Warn("Net addr can not be dial for now,waiting...", zap.String("addr", c.addr), zap.Error(err)) |
| | | return |
| | | } |
| | | c.Conn = conn |
| | | |
| | | // 设置连接状态 |
| | | c.SetState(StateConnected) |
| | | c.logger.Debug("Client service connected.", zap.String("addr", c.addr)) |
| | | c.Logger.Debug("Client service connected.", zap.String("addr", c.addr)) |
| | | |
| | | // 启用读取通道 |
| | | c.SetRWBuf() |
| | |
| | | // 启用心跳 |
| | | c.waitGroup.Wrap(c.writeHeartBeat) |
| | | c.Wait() |
| | | c.logger.Warn("Client service disconnected.Return...", zap.String("addr", c.addr)) |
| | | c.Logger.Warn("Client service disconnected.Return...", zap.String("addr", c.addr)) |
| | | } |
| | | |
| | | // 设置连接状态 |
| | |
| | | for { |
| | | select { |
| | | case <- c.exitChan: |
| | | c.logger.Debug("Close client", zap.String("deviceId", c.deviceId)) |
| | | c.Logger.Debug("Close client", zap.String("deviceId", c.deviceId)) |
| | | c.Close() |
| | | c.logger.Warn("writeLoop Done...") |
| | | c.Logger.Warn("writeLoop Done...") |
| | | return |
| | | case bodyByte := <- c.writeChan: |
| | | binary.BigEndian.PutUint32(byte4, uint32(len(bodyByte))) |
| | | body = append(byte4, bodyByte...) |
| | | _,err = c.Conn.Write(body) |
| | | if err != nil { |
| | | c.logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte))) |
| | | c.Logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte))) |
| | | c.Close() |
| | | c.logger.Warn("writeLoop Done...") |
| | | c.Logger.Warn("writeLoop Done...") |
| | | return |
| | | } |
| | | err = c.Writer.Flush() |
| | | if err != nil { |
| | | c.logger.Error("Fail to write flush", zap.Error(err)) |
| | | c.Logger.Error("Fail to write flush", zap.Error(err)) |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | // 关闭的连接不能写入 |
| | | if c.IsClosed() { |
| | | c.logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data))) |
| | | c.Logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data))) |
| | | return nil,nil |
| | | } |
| | | |
| | |
| | | |
| | | func (c *Client) WriteBody(body *aiot.Protocol) error { |
| | | body.Receiver = aiot.RECEIVER_TO_SAAS |
| | | c.logger.Debug("Send msg...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType)) |
| | | c.Logger.Debug("Send msg...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType)) |
| | | msgData, err := json.Marshal(body) |
| | | if err != nil { |
| | | c.logger.Error("Fail to Marshal send data", zap.Error(err)) |
| | | c.Logger.Error("Fail to Marshal send data", zap.Error(err)) |
| | | return err |
| | | } |
| | | c.msgLock.Lock() |
| | |
| | | |
| | | // 发送注册包 |
| | | func (c *Client) writeRegister() { |
| | | c.logger.Debug("registering...") |
| | | c.Logger.Debug("registering...") |
| | | data := c.deviceRegister |
| | | msgData, _ := json.Marshal(data) |
| | | _, err := c.WriteMsg(c.deviceId, aiot.MSG_TYPE_REGISTER, aiot.REQ_TYPE_REQUEST, msgData, c.GetMsgProto("")) |
| | | if err != nil { |
| | | c.logger.Error("Fail to send device register", zap.Any("msg", msgData)) |
| | | c.Logger.Error("Fail to send device register", zap.Any("msg", msgData)) |
| | | } |
| | | } |
| | | |
| | | // 发送心跳包 |
| | | func (c *Client) writeHeartBeat() { |
| | | c.logger.Debug("Start HeartBeating...",zap.String("addr", c.addr)) |
| | | c.Logger.Debug("Start HeartBeating...",zap.String("addr", c.addr)) |
| | | pingData, _ := json.Marshal(c.heartBeatProto) |
| | | t := time.NewTicker(DefaultHeartbeatInterval) |
| | | defer func() { |
| | |
| | | if err == io.EOF { |
| | | err = nil |
| | | } else { |
| | | c.logger.Error("Fail to read request", zap.Error(err)) |
| | | c.Logger.Error("Fail to read request", zap.Error(err)) |
| | | c.Close() |
| | | return |
| | | } |
| | |
| | | } |
| | | length = binary.BigEndian.Uint32(c.tmpByte4Slice) |
| | | if length > DefaultBufferSize { |
| | | c.logger.Error("Fail to read request data from io", zap.Uint32("length",length)) |
| | | c.Logger.Error("Fail to read request data from io", zap.Uint32("length",length)) |
| | | } |
| | | // 读取body |
| | | bodyByte := make([]byte, length) |
| | |
| | | body := &aiot.Protocol{} |
| | | err = json.Unmarshal(bodyByte, body) |
| | | if err != nil { |
| | | c.logger.Error("Fail to unmarshal message", zap.Error(err), zap.String("msg", string(bodyByte))) |
| | | c.Logger.Error("Fail to unmarshal message", zap.Error(err), zap.String("msg", string(bodyByte))) |
| | | } |
| | | c.logger.Debug("Read msg...", zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType)) |
| | | c.Logger.Debug("Read msg...", zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType)) |
| | | // 处理回调 |
| | | c.onMessage(body) |
| | | } |
| | | |
| | | c.logger.Warn("ReadLoop Done...") |
| | | c.Logger.Warn("ReadLoop Done...") |
| | | } |
| | | |
| | | // 处理回调 |
| | | func (c *Client) onMessage (body *aiot.Protocol) { |
| | | // 未封装callback,只写日志 |
| | | if c.clientCallback == nil { |
| | | c.logger.Warn("ClientCallBack interface is nil...", zap.String("addr", c.addr), zap.Any("body", body)) |
| | | c.Logger.Warn("ClientCallBack interface is nil...", zap.String("addr", c.addr), zap.Any("body", body)) |
| | | return |
| | | } |
| | | |
| | |
| | | |
| | | // 关闭TCP |
| | | func (c *Client) Close() { |
| | | c.logger.Debug("Closing connect", zap.String("addr", c.addr)) |
| | | c.Logger.Debug("Closing connect", zap.String("addr", c.addr)) |
| | | c.closeLock.Lock() |
| | | defer c.closeLock.Unlock() |
| | | |
| | |
| | | import ( |
| | | "basic.com/valib/go-aiot.git/aiotProto/aiot" |
| | | "basic.com/valib/go-aiot.git/client" |
| | | "basic.com/valib/logger.git" |
| | | "errors" |
| | | "go.uber.org/zap" |
| | | ) |
| | |
| | | // 错误信息 |
| | | errMsg := "Cluster have not registered" |
| | | // 记录日志 |
| | | logger.Error(errMsg, zap.Any("msg", msg)) |
| | | cli.Logger.Error(errMsg, zap.Any("msg", msg)) |
| | | cli.Close() |
| | | // 移除集群 |
| | | Srv.RemoveCluster(msg.SenderId) |
| | |
| | | "basic.com/valib/go-aiot.git/aiotProto/aiot" |
| | | "basic.com/valib/go-aiot.git/client" |
| | | "basic.com/valib/go-aiot.git/util" |
| | | "basic.com/valib/logger.git" |
| | | uuid "github.com/satori/go.uuid" |
| | | "go.uber.org/zap" |
| | | "net" |
| | |
| | | ClusterBlackList map[string]struct{} |
| | | // 回调接口 |
| | | serverCallBack ServerCallBack |
| | | // logger |
| | | Logger *zap.SugaredLogger |
| | | } |
| | | |
| | | // 全局服务 |
| | |
| | | } |
| | | |
| | | // 初始化服务 |
| | | func NewServer(addr string, serverId string, serverCallBack ServerCallBack) *Server { |
| | | func NewServer(addr string, serverId string, serverCallBack ServerCallBack, logger *zap.SugaredLogger) *Server { |
| | | logger.Debug("New server...", zap.String("addr", addr)) |
| | | return &Server{ |
| | | addr: addr, |
| | | waitGroup: &util.WaitGroupWrapper{}, |
| | |
| | | ClusterDevice: make(map[string]map[string]struct{}), |
| | | ClusterBlackList: make(map[string]struct{}), |
| | | serverCallBack: serverCallBack, |
| | | Logger: logger, |
| | | } |
| | | } |
| | | |
| | |
| | | |
| | | // 启动服务 |
| | | func (s *Server) StartSrv() error { |
| | | s.Logger.Debug("Start server...", zap.String("addr", s.addr)) |
| | | // 错误 |
| | | var err error |
| | | // tcpAddr |
| | | s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr) |
| | | if err != nil { |
| | | logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err)) |
| | | s.Logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err)) |
| | | return err |
| | | } |
| | | |
| | | // 监听 |
| | | s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr) |
| | | if err != nil { |
| | | logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err)) |
| | | s.Logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err)) |
| | | return err |
| | | } |
| | | |
| | | // 收到连接 |
| | | logger.Debug("Starting to listen addr", zap.String("addr", s.addr)) |
| | | s.Logger.Debug("Starting to listen addr", zap.String("addr", s.addr)) |
| | | s.waitGroup.Wrap(func() { |
| | | for { |
| | | // 获取连接 |
| | |
| | | if err != nil { |
| | | // 让出grouting |
| | | if netErr, ok := err.(net.Error);ok && netErr.Temporary() { |
| | | logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err)) |
| | | s.Logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err)) |
| | | runtime.Gosched() |
| | | continue |
| | | } |
| | | |
| | | // 不能使用已关闭的连接 |
| | | if !strings.Contains(err.Error(), "use of closed network connection") { |
| | | logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err)) |
| | | s.Logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err)) |
| | | } |
| | | break |
| | | } |
| | | |
| | | // 处理连接 |
| | | logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String())) |
| | | s.Logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String())) |
| | | go s.Handler(clientConn) |
| | | } |
| | | }) |
| | | |
| | | // wait |
| | | s.waitGroup.Wait() |
| | | logger.Warn("Tcp server exist", zap.String("addr", s.addr)) |
| | | s.Logger.Warn("Tcp server exist", zap.String("addr", s.addr)) |
| | | return nil |
| | | } |
| | | |
| | | // 处理连接 |
| | | func (s *Server) Handler(clientConn net.Conn) { |
| | | logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String())) |
| | | s.Logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String())) |
| | | |
| | | // 临时ID |
| | | tplClientId := uuid.NewV4().String() |
| | |
| | | |
| | | // 初始化连接 |
| | | cliCon := &Clients{} |
| | | cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon) |
| | | cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon, s.Logger) |
| | | |
| | | // 设置连接状态 |
| | | cli.SetState(client.StateConnected) |