saas-smartAi通信协议标准库
gongshangguo
2022-02-28 160a425c85f128ed47f92aa15c7eb6f76d68610a
server/server.go
@@ -46,6 +46,119 @@
// 全局服务
var Srv *Server
// 上锁
func (s *Server) Lock() {
   s.deviceLock.Lock()
}
// 解锁
func (s *Server) UnLock() {
   s.deviceLock.Unlock()
}
// 初始化服务
func NewServer(addr string, serverId string, serverCallBack ServerCallBack, logger *zap.SugaredLogger) *Server {
   logger.Debug("New server...", zap.String("addr", addr))
   return &Server{
      addr: addr,
      waitGroup: &util.WaitGroupWrapper{},
      serverId: serverId,
      deviceLock: new(sync.RWMutex),
      Clusters: make(map[string]*client.Client),
      Devices: make(map[string]struct{}),
      ClusterDevice: make(map[string]map[string]struct{}),
      ClusterBlackList: make(map[string]struct{}),
      serverCallBack: serverCallBack,
      Logger: logger,
   }
}
// 启动服务
func (s *Server) StartSrv() error {
   s.Logger.Debug("Start server...", zap.String("addr", s.addr))
   // 错误
   var err error
   // tcpAddr
   s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr)
   if err != nil {
      s.Logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err))
      return err
   }
   // 监听
   s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr)
   if err != nil {
      s.Logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err))
      return err
   }
   // 收到连接
   s.Logger.Debug("Starting to listen addr", zap.String("addr", s.addr))
   s.waitGroup.Wrap(func() {
      for {
         // 获取连接
         clientConn, err := s.tcpListener.Accept()
         if err != nil {
            // 让出grouting
            if netErr, ok := err.(net.Error);ok && netErr.Temporary() {
               s.Logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err))
               runtime.Gosched()
               continue
            }
            // 不能使用已关闭的连接
            if !strings.Contains(err.Error(), "use of closed network connection") {
               s.Logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err))
            }
            break
         }
         // 处理连接
         s.Logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
         go s.Handler(clientConn)
      }
   })
   // wait
   s.waitGroup.Wait()
   s.Logger.Warn("Tcp server exist", zap.String("addr", s.addr))
   return nil
}
// 处理连接
func (s *Server) Handler(clientConn net.Conn) {
   s.Logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
   // 临时ID
   tplClientId := uuid.NewV4().String()
   // 注册信息
   cliRegister := &aiot.DeviceRegister{}
   // 初始化连接
   cliCon := &Clients{}
   cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon, s.Logger)
   // 设置连接状态
   cli.SetState(client.StateConnected)
   cli.Conn = clientConn
   // 启用读写句柄
   cli.SetRWBuf()
   // wait
   cli.Wait()
   cli.Close()
}
// 集群是否注册
func (s *Server) IsMasterOnline(masterId string) bool {
   if _,ok := s.ClusterDevice[masterId];ok {
      return true
   }
   return false
}
// 通过masterId获取集群ID
func (s *Server) GetClusterIdByMasterId(masterId string) string {
   if clusterId,ok := s.ClusterMaster[masterId];ok {
@@ -69,41 +182,6 @@
   s.Lock()
   defer s.UnLock()
   s.ClusterMaster[masterId] = clusterId
}
// 集群是否注册
func (s *Server) IsMasterOnline(masterId string) bool {
   if _,ok := s.ClusterDevice[masterId];ok {
      return true
   }
   return false
}
// 初始化服务
func NewServer(addr string, serverId string, serverCallBack ServerCallBack, logger *zap.SugaredLogger) *Server {
   logger.Debug("New server...", zap.String("addr", addr))
   return &Server{
      addr: addr,
      waitGroup: &util.WaitGroupWrapper{},
      serverId: serverId,
      deviceLock: new(sync.RWMutex),
      Clusters: make(map[string]*client.Client),
      Devices: make(map[string]struct{}),
      ClusterDevice: make(map[string]map[string]struct{}),
      ClusterBlackList: make(map[string]struct{}),
      serverCallBack: serverCallBack,
      Logger: logger,
   }
}
// 上锁
func (s *Server) Lock() {
   s.deviceLock.Lock()
}
// 解锁
func (s *Server) UnLock() {
   s.deviceLock.Unlock()
}
// 注册集群ID
@@ -186,83 +264,6 @@
   return true
}
// 启动服务
func (s *Server) StartSrv() error {
   s.Logger.Debug("Start server...", zap.String("addr", s.addr))
   // 错误
   var err error
   // tcpAddr
   s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr)
   if err != nil {
      s.Logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err))
      return err
   }
   // 监听
   s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr)
   if err != nil {
      s.Logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err))
      return err
   }
   // 收到连接
   s.Logger.Debug("Starting to listen addr", zap.String("addr", s.addr))
   s.waitGroup.Wrap(func() {
      for {
         // 获取连接
         clientConn, err := s.tcpListener.Accept()
         if err != nil {
            // 让出grouting
            if netErr, ok := err.(net.Error);ok && netErr.Temporary() {
               s.Logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err))
               runtime.Gosched()
               continue
            }
            // 不能使用已关闭的连接
            if !strings.Contains(err.Error(), "use of closed network connection") {
               s.Logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err))
            }
            break
         }
         // 处理连接
         s.Logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
         go s.Handler(clientConn)
      }
   })
   // wait
   s.waitGroup.Wait()
   s.Logger.Warn("Tcp server exist", zap.String("addr", s.addr))
   return nil
}
// 处理连接
func (s *Server) Handler(clientConn net.Conn) {
   s.Logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
   // 临时ID
   tplClientId := uuid.NewV4().String()
   // 注册信息
   cliRegister := &aiot.DeviceRegister{}
   // 初始化连接
   cliCon := &Clients{}
   cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon, s.Logger)
   // 设置连接状态
   cli.SetState(client.StateConnected)
   cli.Conn = clientConn
   // 启用读写句柄
   cli.SetRWBuf()
   // wait
   cli.Wait()
}
// 处理设备注册
func (s *Server) RegisterDevice(msg *aiot.Protocol, cli *client.Client) {
   // 设置节点ID
@@ -271,6 +272,6 @@
   s.SetCluster(msg.SenderId, cli)
   // 设置集群
   registerData := &aiot.DeviceRegister{}
   json.Unmarshal(msg.Data, registerData)
   _ = json.Unmarshal(msg.Data, registerData)
   s.SetDeviceList(msg.SenderId, registerData)
}