| | |
| | | // 全局服务 |
| | | var Srv *Server |
| | | |
| | | // 上锁 |
| | | func (s *Server) Lock() { |
| | | s.deviceLock.Lock() |
| | | } |
| | | |
| | | // 解锁 |
| | | func (s *Server) UnLock() { |
| | | s.deviceLock.Unlock() |
| | | } |
| | | |
| | | // 初始化服务 |
| | | func NewServer(addr string, serverId string, serverCallBack ServerCallBack, logger *zap.SugaredLogger) *Server { |
| | | logger.Debug("New server...", zap.String("addr", addr)) |
| | | return &Server{ |
| | | addr: addr, |
| | | waitGroup: &util.WaitGroupWrapper{}, |
| | | serverId: serverId, |
| | | deviceLock: new(sync.RWMutex), |
| | | Clusters: make(map[string]*client.Client), |
| | | Devices: make(map[string]struct{}), |
| | | ClusterDevice: make(map[string]map[string]struct{}), |
| | | ClusterBlackList: make(map[string]struct{}), |
| | | serverCallBack: serverCallBack, |
| | | Logger: logger, |
| | | } |
| | | } |
| | | |
| | | // 启动服务 |
| | | func (s *Server) StartSrv() error { |
| | | s.Logger.Debug("Start server...", zap.String("addr", s.addr)) |
| | | // 错误 |
| | | var err error |
| | | // tcpAddr |
| | | s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr) |
| | | if err != nil { |
| | | s.Logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err)) |
| | | return err |
| | | } |
| | | |
| | | // 监听 |
| | | s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr) |
| | | if err != nil { |
| | | s.Logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err)) |
| | | return err |
| | | } |
| | | |
| | | // 收到连接 |
| | | s.Logger.Debug("Starting to listen addr", zap.String("addr", s.addr)) |
| | | s.waitGroup.Wrap(func() { |
| | | for { |
| | | // 获取连接 |
| | | clientConn, err := s.tcpListener.Accept() |
| | | if err != nil { |
| | | // 让出grouting |
| | | if netErr, ok := err.(net.Error);ok && netErr.Temporary() { |
| | | s.Logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err)) |
| | | runtime.Gosched() |
| | | continue |
| | | } |
| | | |
| | | // 不能使用已关闭的连接 |
| | | if !strings.Contains(err.Error(), "use of closed network connection") { |
| | | s.Logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err)) |
| | | } |
| | | break |
| | | } |
| | | |
| | | // 处理连接 |
| | | s.Logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String())) |
| | | go s.Handler(clientConn) |
| | | } |
| | | }) |
| | | |
| | | // wait |
| | | s.waitGroup.Wait() |
| | | s.Logger.Warn("Tcp server exist", zap.String("addr", s.addr)) |
| | | return nil |
| | | } |
| | | |
| | | // 处理连接 |
| | | func (s *Server) Handler(clientConn net.Conn) { |
| | | s.Logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String())) |
| | | |
| | | // 临时ID |
| | | tplClientId := uuid.NewV4().String() |
| | | |
| | | // 注册信息 |
| | | cliRegister := &aiot.DeviceRegister{} |
| | | |
| | | // 初始化连接 |
| | | cliCon := &Clients{} |
| | | cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon, s.Logger) |
| | | |
| | | // 设置连接状态 |
| | | cli.SetState(client.StateConnected) |
| | | cli.Conn = clientConn |
| | | |
| | | // 启用读写句柄 |
| | | cli.SetRWBuf() |
| | | |
| | | // wait |
| | | cli.Wait() |
| | | cli.Close() |
| | | } |
| | | |
| | | // 集群是否注册 |
| | | func (s *Server) IsMasterOnline(masterId string) bool { |
| | | if _,ok := s.ClusterDevice[masterId];ok { |
| | | return true |
| | | } |
| | | return false |
| | | } |
| | | |
| | | // 通过masterId获取集群ID |
| | | func (s *Server) GetClusterIdByMasterId(masterId string) string { |
| | | if clusterId,ok := s.ClusterMaster[masterId];ok { |
| | |
| | | s.Lock() |
| | | defer s.UnLock() |
| | | s.ClusterMaster[masterId] = clusterId |
| | | } |
| | | |
| | | // 集群是否注册 |
| | | func (s *Server) IsMasterOnline(masterId string) bool { |
| | | if _,ok := s.ClusterDevice[masterId];ok { |
| | | return true |
| | | } |
| | | return false |
| | | } |
| | | |
| | | // 初始化服务 |
| | | func NewServer(addr string, serverId string, serverCallBack ServerCallBack, logger *zap.SugaredLogger) *Server { |
| | | logger.Debug("New server...", zap.String("addr", addr)) |
| | | return &Server{ |
| | | addr: addr, |
| | | waitGroup: &util.WaitGroupWrapper{}, |
| | | serverId: serverId, |
| | | deviceLock: new(sync.RWMutex), |
| | | Clusters: make(map[string]*client.Client), |
| | | Devices: make(map[string]struct{}), |
| | | ClusterDevice: make(map[string]map[string]struct{}), |
| | | ClusterBlackList: make(map[string]struct{}), |
| | | serverCallBack: serverCallBack, |
| | | Logger: logger, |
| | | } |
| | | } |
| | | |
| | | // 上锁 |
| | | func (s *Server) Lock() { |
| | | s.deviceLock.Lock() |
| | | } |
| | | |
| | | // 解锁 |
| | | func (s *Server) UnLock() { |
| | | s.deviceLock.Unlock() |
| | | } |
| | | |
| | | // 注册集群ID |
| | |
| | | return true |
| | | } |
| | | |
| | | // 启动服务 |
| | | func (s *Server) StartSrv() error { |
| | | s.Logger.Debug("Start server...", zap.String("addr", s.addr)) |
| | | // 错误 |
| | | var err error |
| | | // tcpAddr |
| | | s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr) |
| | | if err != nil { |
| | | s.Logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err)) |
| | | return err |
| | | } |
| | | |
| | | // 监听 |
| | | s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr) |
| | | if err != nil { |
| | | s.Logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err)) |
| | | return err |
| | | } |
| | | |
| | | // 收到连接 |
| | | s.Logger.Debug("Starting to listen addr", zap.String("addr", s.addr)) |
| | | s.waitGroup.Wrap(func() { |
| | | for { |
| | | // 获取连接 |
| | | clientConn, err := s.tcpListener.Accept() |
| | | if err != nil { |
| | | // 让出grouting |
| | | if netErr, ok := err.(net.Error);ok && netErr.Temporary() { |
| | | s.Logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err)) |
| | | runtime.Gosched() |
| | | continue |
| | | } |
| | | |
| | | // 不能使用已关闭的连接 |
| | | if !strings.Contains(err.Error(), "use of closed network connection") { |
| | | s.Logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err)) |
| | | } |
| | | break |
| | | } |
| | | |
| | | // 处理连接 |
| | | s.Logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String())) |
| | | go s.Handler(clientConn) |
| | | } |
| | | }) |
| | | |
| | | // wait |
| | | s.waitGroup.Wait() |
| | | s.Logger.Warn("Tcp server exist", zap.String("addr", s.addr)) |
| | | return nil |
| | | } |
| | | |
| | | // 处理连接 |
| | | func (s *Server) Handler(clientConn net.Conn) { |
| | | s.Logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String())) |
| | | |
| | | // 临时ID |
| | | tplClientId := uuid.NewV4().String() |
| | | |
| | | // 注册信息 |
| | | cliRegister := &aiot.DeviceRegister{} |
| | | |
| | | // 初始化连接 |
| | | cliCon := &Clients{} |
| | | cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon, s.Logger) |
| | | |
| | | // 设置连接状态 |
| | | cli.SetState(client.StateConnected) |
| | | cli.Conn = clientConn |
| | | |
| | | // 启用读写句柄 |
| | | cli.SetRWBuf() |
| | | |
| | | // wait |
| | | cli.Wait() |
| | | } |
| | | |
| | | // 处理设备注册 |
| | | func (s *Server) RegisterDevice(msg *aiot.Protocol, cli *client.Client) { |
| | | // 设置节点ID |
| | |
| | | s.SetCluster(msg.SenderId, cli) |
| | | // 设置集群 |
| | | registerData := &aiot.DeviceRegister{} |
| | | json.Unmarshal(msg.Data, registerData) |
| | | _ = json.Unmarshal(msg.Data, registerData) |
| | | s.SetDeviceList(msg.SenderId, registerData) |
| | | } |