saas-smartAi通信协议标准库
gongshangguo
2022-02-28 2e64015f3b558533bd90cbb1edaa8faaaead7ae5
server/server.go
@@ -1,11 +1,11 @@
package server
import (
   "basic.com/valib/logger.git"
   "basic.com/valib/go-aiot.git/aiotProto/aiot"
   "basic.com/valib/go-aiot.git/client"
   "basic.com/valib/go-aiot.git/util"
   "encoding/json"
   uuid "github.com/satori/go.uuid"
   "go-aiot/aiotProto/aiot"
   "go-aiot/client"
   "go-aiot/util"
   "go.uber.org/zap"
   "net"
   "runtime"
@@ -39,6 +39,8 @@
   ClusterBlackList map[string]struct{}
   // 回调接口
   serverCallBack ServerCallBack
   // logger
   Logger *zap.SugaredLogger
}
// 全局服务
@@ -78,7 +80,8 @@
}
// 初始化服务
func NewServer(addr string, serverId string, serverCallBack ServerCallBack) *Server {
func NewServer(addr string, serverId string, serverCallBack ServerCallBack, logger *zap.SugaredLogger) *Server {
   logger.Debug("New server...", zap.String("addr", addr))
   return &Server{
      addr: addr,
      waitGroup: &util.WaitGroupWrapper{},
@@ -89,6 +92,7 @@
      ClusterDevice: make(map[string]map[string]struct{}),
      ClusterBlackList: make(map[string]struct{}),
      serverCallBack: serverCallBack,
      Logger: logger,
   }
}
@@ -157,8 +161,9 @@
}
// 注册设备信息
func (s *Server) SetDeviceList(masterId string, nodeIds []string) bool {
   if len(nodeIds) == 0 {
func (s *Server) SetDeviceList(masterId string, registerData *aiot.DeviceRegister) bool {
   if len(registerData.DeviceList) == 0 {
      return true
   }
   // 锁
@@ -171,33 +176,37 @@
   }
   // 添加设备ID
   for _, nodeId := range nodeIds{
      s.ClusterDevice[masterId][nodeId] = struct{}{}
      s.Devices[nodeId] = struct{}{}
   if s.ClusterDevice[masterId] == nil {
      s.ClusterDevice[masterId] = make(map[string]struct{})
   }
   for _, node := range registerData.DeviceList{
      s.ClusterDevice[masterId][node.DeviceId] = struct{}{}
      s.Devices[node.DeviceId] = struct{}{}
   }
   return true
}
// 启动服务
func (s *Server) StartSrv() error {
   s.Logger.Debug("Start server...", zap.String("addr", s.addr))
   // 错误
   var err error
   // tcpAddr
   s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr)
   if err != nil {
      logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err))
      s.Logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err))
      return err
   }
   // 监听
   s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr)
   if err != nil {
      logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err))
      s.Logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err))
      return err
   }
   // 收到连接
   logger.Debug("Starting to listen addr", zap.String("addr", s.addr))
   s.Logger.Debug("Starting to listen addr", zap.String("addr", s.addr))
   s.waitGroup.Wrap(func() {
      for {
         // 获取连接
@@ -205,33 +214,33 @@
         if err != nil {
            // 让出grouting
            if netErr, ok := err.(net.Error);ok && netErr.Temporary() {
               logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err))
               s.Logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err))
               runtime.Gosched()
               continue
            }
            // 不能使用已关闭的连接
            if !strings.Contains(err.Error(), "use of closed network connection") {
               logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err))
               s.Logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err))
            }
            break
         }
         // 处理连接
         logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
         s.Logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
         go s.Handler(clientConn)
      }
   })
   // wait
   s.waitGroup.Wait()
   logger.Warn("Tcp server exist", zap.String("addr", s.addr))
   s.Logger.Warn("Tcp server exist", zap.String("addr", s.addr))
   return nil
}
// 处理连接
func (s *Server) Handler(clientConn net.Conn) {
   logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
   s.Logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
   // 临时ID
   tplClientId := uuid.NewV4().String()
@@ -241,7 +250,7 @@
   // 初始化连接
   cliCon := &Clients{}
   cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon)
   cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon, s.Logger)
   // 设置连接状态
   cli.SetState(client.StateConnected)
@@ -261,5 +270,7 @@
   // 添加集群ID
   s.SetCluster(msg.SenderId, cli)
   // 设置集群
   s.SetDeviceList(msg.SenderId, msg.DeviceProto.DeviceIds)
   registerData := &aiot.DeviceRegister{}
   json.Unmarshal(msg.Data, registerData)
   s.SetDeviceList(msg.SenderId, registerData)
}