package server import ( "basic.com/valib/go-aiot.git/aiotProto/aiot" "basic.com/valib/go-aiot.git/client" "basic.com/valib/go-aiot.git/util" "encoding/json" uuid "github.com/satori/go.uuid" "go.uber.org/zap" "net" "runtime" "strings" "sync" ) // 服务端结构体 type Server struct { // 监听地址,格式:0.0.0.0:7081 addr string // 携程库 waitGroup *util.WaitGroupWrapper // tcpListener tcpListener *net.TCPListener // tcpAddr tcpAddr *net.TCPAddr // 服务端设备ID serverId string // 设备信息写入锁 deviceLock *sync.RWMutex // 所有的集群ID列表 key: masterId Clusters map[string]*client.Client // 集群和主节点 key:masterId value:集群ID ClusterMaster map[string]string // 所有的设备连接池列表 Devices map[string]struct{} // 以集群为单位设备列表 key0:masterId key1:nodeId ClusterDevice map[string]map[string]struct{} // 集群黑名单 key:masterId ClusterBlackList map[string]struct{} // 回调接口 serverCallBack ServerCallBack // logger Logger *zap.SugaredLogger } // 全局服务 var Srv *Server // 上锁 func (s *Server) Lock() { s.deviceLock.Lock() } // 解锁 func (s *Server) UnLock() { s.deviceLock.Unlock() } // 初始化服务 func NewServer(addr string, serverId string, serverCallBack ServerCallBack, logger *zap.SugaredLogger) *Server { logger.Debug("New server...", zap.String("addr", addr)) return &Server{ addr: addr, waitGroup: &util.WaitGroupWrapper{}, serverId: serverId, deviceLock: new(sync.RWMutex), Clusters: make(map[string]*client.Client), Devices: make(map[string]struct{}), ClusterDevice: make(map[string]map[string]struct{}), ClusterBlackList: make(map[string]struct{}), serverCallBack: serverCallBack, Logger: logger, } } // 启动服务 func (s *Server) StartSrv() error { s.Logger.Debug("Start server...", zap.String("addr", s.addr)) // 错误 var err error // tcpAddr s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr) if err != nil { s.Logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err)) return err } // 监听 s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr) if err != nil { s.Logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err)) return err } // 收到连接 s.Logger.Debug("Starting to listen addr", zap.String("addr", s.addr)) s.waitGroup.Wrap(func() { for { // 获取连接 clientConn, err := s.tcpListener.Accept() if err != nil { // 让出grouting if netErr, ok := err.(net.Error);ok && netErr.Temporary() { s.Logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err)) runtime.Gosched() continue } // 不能使用已关闭的连接 if !strings.Contains(err.Error(), "use of closed network connection") { s.Logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err)) } break } // 处理连接 s.Logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String())) go s.Handler(clientConn) } }) // wait s.waitGroup.Wait() s.Logger.Warn("Tcp server exist", zap.String("addr", s.addr)) return nil } // 处理连接 func (s *Server) Handler(clientConn net.Conn) { s.Logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String())) // 临时ID tplClientId := uuid.NewV4().String() // 注册信息 cliRegister := &aiot.DeviceRegister{} // 初始化连接 cliCon := &Clients{} cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon, s.Logger) // 设置连接状态 cli.SetState(client.StateConnected) cli.Conn = clientConn // 启用读写句柄 cli.SetRWBuf() // wait cli.Wait() cli.Close() } // 集群是否注册 func (s *Server) IsMasterOnline(masterId string) bool { if _,ok := s.ClusterDevice[masterId];ok { return true } return false } // 通过masterId获取集群ID func (s *Server) GetClusterIdByMasterId(masterId string) string { if clusterId,ok := s.ClusterMaster[masterId];ok { return clusterId } return "" } // 通过集群ID获取masterId func (s *Server) GetMasterIdByClusterId(clusterId string) string { for masterId, cId := range s.ClusterMaster{ if cId == clusterId { return masterId } } return "" } // 绑定集群和master关系 func (s *Server) SetClusterIdMasterId(clusterId string, masterId string) { s.Lock() defer s.UnLock() s.ClusterMaster[masterId] = clusterId } // 注册集群ID func (s *Server) SetCluster(masterId string, cli *client.Client) bool { if masterId == "" { return true } // 检测黑名单 if _,ok := s.ClusterBlackList[masterId];ok { return false } // 添加集群ID s.Clusters[masterId] = cli return true } // 删除集群 func (s *Server) RemoveCluster(masterId string) bool { s.Lock() defer s.UnLock() // 关闭节点连接 if clusterDevice,ok := s.ClusterDevice[masterId];ok { for deviceId := range clusterDevice{ // 移除集群中的设备 if _, ok := s.Devices[deviceId];ok { delete(s.Devices, deviceId) } } } // 移除集群 if _,ok := s.Clusters[masterId];ok { delete(s.Clusters, masterId) return true } // 加入集群黑名单 s.ClusterBlackList[masterId] = struct{}{} return false } // 删除设备 func (s *Server) RemoveDevice(deviceId string) { // 如果是主节点 if _,ok := s.ClusterDevice[deviceId];ok { s.RemoveCluster(deviceId) }else{ // 删除当前节点 if _, ok := s.Devices[deviceId];ok { s.Lock() defer s.UnLock() delete(s.Devices, deviceId) } } } // 注册设备信息 func (s *Server) SetDeviceList(masterId string, registerData *aiot.DeviceRegister) bool { if len(registerData.DeviceList) == 0 { return true } // 锁 s.Lock() defer s.UnLock() // 检测黑名单 if _,ok := s.ClusterBlackList[masterId];ok { return false } // 添加设备ID if s.ClusterDevice[masterId] == nil { s.ClusterDevice[masterId] = make(map[string]struct{}) } for _, node := range registerData.DeviceList{ s.ClusterDevice[masterId][node.DeviceId] = struct{}{} s.Devices[node.DeviceId] = struct{}{} } return true } // 处理设备注册 func (s *Server) RegisterDevice(cli *client.Client, msg *aiot.Protocol) { // 设置节点ID cli.SetDeviceId(msg.SenderId) // 添加集群ID s.SetCluster(msg.SenderId, cli) // 设置集群 registerData := &aiot.DeviceRegister{} _ = json.Unmarshal(msg.Data, registerData) s.SetDeviceList(msg.SenderId, registerData) }