saas-smartAi通信协议标准库
gongshangguo
2022-03-08 a32c642ffa2b9f05e3b55fc88b2c2bed3277a080
server/server.go
@@ -4,7 +4,7 @@
   "basic.com/valib/go-aiot.git/aiotProto/aiot"
   "basic.com/valib/go-aiot.git/client"
   "basic.com/valib/go-aiot.git/util"
   "basic.com/valib/logger.git"
   "encoding/json"
   uuid "github.com/satori/go.uuid"
   "go.uber.org/zap"
   "net"
@@ -39,10 +39,125 @@
   ClusterBlackList map[string]struct{}
   // 回调接口
   serverCallBack ServerCallBack
   // logger
   Logger *zap.SugaredLogger
}
// 全局服务
var Srv *Server
// 上锁
func (s *Server) Lock() {
   s.deviceLock.Lock()
}
// 解锁
func (s *Server) UnLock() {
   s.deviceLock.Unlock()
}
// 初始化服务
func NewServer(addr string, serverId string, serverCallBack ServerCallBack, logger *zap.SugaredLogger) *Server {
   logger.Debug("New server...", zap.String("addr", addr))
   return &Server{
      addr: addr,
      waitGroup: &util.WaitGroupWrapper{},
      serverId: serverId,
      deviceLock: new(sync.RWMutex),
      Clusters: make(map[string]*client.Client),
      Devices: make(map[string]struct{}),
      ClusterDevice: make(map[string]map[string]struct{}),
      ClusterBlackList: make(map[string]struct{}),
      serverCallBack: serverCallBack,
      Logger: logger,
   }
}
// 启动服务
func (s *Server) StartSrv() error {
   s.Logger.Debug("Start server...", zap.String("addr", s.addr))
   // 错误
   var err error
   // tcpAddr
   s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr)
   if err != nil {
      s.Logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err))
      return err
   }
   // 监听
   s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr)
   if err != nil {
      s.Logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err))
      return err
   }
   // 收到连接
   s.Logger.Debug("Starting to listen addr", zap.String("addr", s.addr))
   s.waitGroup.Wrap(func() {
      for {
         // 获取连接
         clientConn, err := s.tcpListener.Accept()
         if err != nil {
            // 让出grouting
            if netErr, ok := err.(net.Error);ok && netErr.Temporary() {
               s.Logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err))
               runtime.Gosched()
               continue
            }
            // 不能使用已关闭的连接
            if !strings.Contains(err.Error(), "use of closed network connection") {
               s.Logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err))
            }
            break
         }
         // 处理连接
         s.Logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
         go s.Handler(clientConn)
      }
   })
   // wait
   s.waitGroup.Wait()
   s.Logger.Warn("Tcp server exist", zap.String("addr", s.addr))
   return nil
}
// 处理连接
func (s *Server) Handler(clientConn net.Conn) {
   s.Logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
   // 临时ID
   tplClientId := uuid.NewV4().String()
   // 注册信息
   cliRegister := &aiot.DeviceRegister{}
   // 初始化连接
   cliCon := &Clients{}
   cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon, s.Logger)
   // 设置连接状态
   cli.SetState(client.StateConnected)
   cli.Conn = clientConn
   // 启用读写句柄
   cli.SetRWBuf()
   // wait
   cli.Wait()
   cli.Close()
}
// 集群是否注册
func (s *Server) IsMasterOnline(masterId string) bool {
   if _,ok := s.ClusterDevice[masterId];ok {
      return true
   }
   return false
}
// 通过masterId获取集群ID
func (s *Server) GetClusterIdByMasterId(masterId string) string {
@@ -67,39 +182,6 @@
   s.Lock()
   defer s.UnLock()
   s.ClusterMaster[masterId] = clusterId
}
// 集群是否注册
func (s *Server) IsMasterOnline(masterId string) bool {
   if _,ok := s.ClusterDevice[masterId];ok {
      return true
   }
   return false
}
// 初始化服务
func NewServer(addr string, serverId string, serverCallBack ServerCallBack) *Server {
   return &Server{
      addr: addr,
      waitGroup: &util.WaitGroupWrapper{},
      serverId: serverId,
      deviceLock: new(sync.RWMutex),
      Clusters: make(map[string]*client.Client),
      Devices: make(map[string]struct{}),
      ClusterDevice: make(map[string]map[string]struct{}),
      ClusterBlackList: make(map[string]struct{}),
      serverCallBack: serverCallBack,
   }
}
// 上锁
func (s *Server) Lock() {
   s.deviceLock.Lock()
}
// 解锁
func (s *Server) UnLock() {
   s.deviceLock.Unlock()
}
// 注册集群ID
@@ -157,8 +239,9 @@
}
// 注册设备信息
func (s *Server) SetDeviceList(masterId string, nodeIds []string) bool {
   if len(nodeIds) == 0 {
func (s *Server) SetDeviceList(masterId string, registerData *aiot.DeviceRegister) bool {
   if len(registerData.DeviceList) == 0 {
      return true
   }
   // 锁
@@ -171,95 +254,24 @@
   }
   // 添加设备ID
   for _, nodeId := range nodeIds{
      s.ClusterDevice[masterId][nodeId] = struct{}{}
      s.Devices[nodeId] = struct{}{}
   if s.ClusterDevice[masterId] == nil {
      s.ClusterDevice[masterId] = make(map[string]struct{})
   }
   for _, node := range registerData.DeviceList{
      s.ClusterDevice[masterId][node.DeviceId] = struct{}{}
      s.Devices[node.DeviceId] = struct{}{}
   }
   return true
}
// 启动服务
func (s *Server) StartSrv() error {
   // 错误
   var err error
   // tcpAddr
   s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr)
   if err != nil {
      logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err))
      return err
   }
   // 监听
   s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr)
   if err != nil {
      logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err))
      return err
   }
   // 收到连接
   logger.Debug("Starting to listen addr", zap.String("addr", s.addr))
   s.waitGroup.Wrap(func() {
      for {
         // 获取连接
         clientConn, err := s.tcpListener.Accept()
         if err != nil {
            // 让出grouting
            if netErr, ok := err.(net.Error);ok && netErr.Temporary() {
               logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err))
               runtime.Gosched()
               continue
            }
            // 不能使用已关闭的连接
            if !strings.Contains(err.Error(), "use of closed network connection") {
               logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err))
            }
            break
         }
         // 处理连接
         logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
         go s.Handler(clientConn)
      }
   })
   // wait
   s.waitGroup.Wait()
   logger.Warn("Tcp server exist", zap.String("addr", s.addr))
   return nil
}
// 处理连接
func (s *Server) Handler(clientConn net.Conn) {
   logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
   // 临时ID
   tplClientId := uuid.NewV4().String()
   // 注册信息
   cliRegister := &aiot.DeviceRegister{}
   // 初始化连接
   cliCon := &Clients{}
   cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon)
   // 设置连接状态
   cli.SetState(client.StateConnected)
   cli.Conn = clientConn
   // 启用读写句柄
   cli.SetRWBuf()
   // wait
   cli.Wait()
}
// 处理设备注册
func (s *Server) RegisterDevice(msg *aiot.Protocol, cli *client.Client) {
func (s *Server) RegisterDevice(cli *client.Client, msg *aiot.Protocol) {
   // 设置节点ID
   cli.SetDeviceId(msg.SenderId)
   // 添加集群ID
   s.SetCluster(msg.SenderId, cli)
   // 设置集群
   s.SetDeviceList(msg.SenderId, msg.DeviceProto.DeviceIds)
   registerData := &aiot.DeviceRegister{}
   _ = json.Unmarshal(msg.Data, registerData)
   s.SetDeviceList(msg.SenderId, registerData)
}