saas-smartAi通信协议标准库
gongshangguo
2022-03-02 d13ed6d79af61db6333ad5cc3200d96b6410d064
server/server.go
@@ -4,6 +4,7 @@
   "basic.com/valib/go-aiot.git/aiotProto/aiot"
   "basic.com/valib/go-aiot.git/client"
   "basic.com/valib/go-aiot.git/util"
   "encoding/json"
   uuid "github.com/satori/go.uuid"
   "go.uber.org/zap"
   "net"
@@ -45,37 +46,14 @@
// 全局服务
var Srv *Server
// 通过masterId获取集群ID
func (s *Server) GetClusterIdByMasterId(masterId string) string {
   if clusterId,ok := s.ClusterMaster[masterId];ok {
      return clusterId
   }
   return ""
// 上锁
func (s *Server) Lock() {
   s.deviceLock.Lock()
}
// 通过集群ID获取masterId
func (s *Server) GetMasterIdByClusterId(clusterId string) string {
   for masterId, cId := range s.ClusterMaster{
      if cId == clusterId {
         return masterId
      }
   }
   return ""
}
// 绑定集群和master关系
func (s *Server) SetClusterIdMasterId(clusterId string, masterId string) {
   s.Lock()
   defer s.UnLock()
   s.ClusterMaster[masterId] = clusterId
}
// 集群是否注册
func (s *Server) IsMasterOnline(masterId string) bool {
   if _,ok := s.ClusterDevice[masterId];ok {
      return true
   }
   return false
// 解锁
func (s *Server) UnLock() {
   s.deviceLock.Unlock()
}
// 初始化服务
@@ -93,92 +71,6 @@
      serverCallBack: serverCallBack,
      Logger: logger,
   }
}
// 上锁
func (s *Server) Lock() {
   s.deviceLock.Lock()
}
// 解锁
func (s *Server) UnLock() {
   s.deviceLock.Unlock()
}
// 注册集群ID
func (s *Server) SetCluster(masterId string, cli *client.Client) bool {
   if masterId == "" {
      return true
   }
   // 检测黑名单
   if _,ok := s.ClusterBlackList[masterId];ok {
      return false
   }
   // 添加集群ID
   s.Clusters[masterId] = cli
   return true
}
// 删除集群
func (s *Server) RemoveCluster(masterId string) bool {
   s.Lock()
   defer s.UnLock()
   // 关闭节点连接
   if clusterDevice,ok := s.ClusterDevice[masterId];ok {
      for deviceId := range clusterDevice{
         // 移除集群中的设备
         if _, ok := s.Devices[deviceId];ok {
            delete(s.Devices, deviceId)
         }
      }
   }
   // 移除集群
   if _,ok := s.Clusters[masterId];ok {
      delete(s.Clusters, masterId)
      return true
   }
   // 加入集群黑名单
   s.ClusterBlackList[masterId] = struct{}{}
   return false
}
// 删除设备
func (s *Server) RemoveDevice(deviceId string)  {
   // 如果是主节点
   if _,ok := s.ClusterDevice[deviceId];ok {
      s.RemoveCluster(deviceId)
   }else{
      // 删除当前节点
      if _, ok := s.Devices[deviceId];ok {
         s.Lock()
         defer s.UnLock()
         delete(s.Devices, deviceId)
      }
   }
}
// 注册设备信息
func (s *Server) SetDeviceList(masterId string, nodeIds []string) bool {
   if len(nodeIds) == 0 {
      return true
   }
   // 锁
   s.Lock()
   defer s.UnLock()
   // 检测黑名单
   if _,ok := s.ClusterBlackList[masterId];ok {
      return false
   }
   // 添加设备ID
   for _, nodeId := range nodeIds{
      s.ClusterDevice[masterId][nodeId] = struct{}{}
      s.Devices[nodeId] = struct{}{}
   }
   return true
}
// 启动服务
@@ -256,14 +148,130 @@
   // wait
   cli.Wait()
   cli.Close()
}
// 集群是否注册
func (s *Server) IsMasterOnline(masterId string) bool {
   if _,ok := s.ClusterDevice[masterId];ok {
      return true
   }
   return false
}
// 通过masterId获取集群ID
func (s *Server) GetClusterIdByMasterId(masterId string) string {
   if clusterId,ok := s.ClusterMaster[masterId];ok {
      return clusterId
   }
   return ""
}
// 通过集群ID获取masterId
func (s *Server) GetMasterIdByClusterId(clusterId string) string {
   for masterId, cId := range s.ClusterMaster{
      if cId == clusterId {
         return masterId
      }
   }
   return ""
}
// 绑定集群和master关系
func (s *Server) SetClusterIdMasterId(clusterId string, masterId string) {
   s.Lock()
   defer s.UnLock()
   s.ClusterMaster[masterId] = clusterId
}
// 注册集群ID
func (s *Server) SetCluster(masterId string, cli *client.Client) bool {
   if masterId == "" {
      return true
   }
   // 检测黑名单
   if _,ok := s.ClusterBlackList[masterId];ok {
      return false
   }
   // 添加集群ID
   s.Clusters[masterId] = cli
   return true
}
// 删除集群
func (s *Server) RemoveCluster(masterId string) bool {
   s.Lock()
   defer s.UnLock()
   // 关闭节点连接
   if clusterDevice,ok := s.ClusterDevice[masterId];ok {
      for deviceId := range clusterDevice{
         // 移除集群中的设备
         if _, ok := s.Devices[deviceId];ok {
            delete(s.Devices, deviceId)
         }
      }
   }
   // 移除集群
   if _,ok := s.Clusters[masterId];ok {
      delete(s.Clusters, masterId)
      return true
   }
   // 加入集群黑名单
   s.ClusterBlackList[masterId] = struct{}{}
   return false
}
// 删除设备
func (s *Server) RemoveDevice(deviceId string)  {
   // 如果是主节点
   if _,ok := s.ClusterDevice[deviceId];ok {
      s.RemoveCluster(deviceId)
   }else{
      // 删除当前节点
      if _, ok := s.Devices[deviceId];ok {
         s.Lock()
         defer s.UnLock()
         delete(s.Devices, deviceId)
      }
   }
}
// 注册设备信息
func (s *Server) SetDeviceList(masterId string, registerData *aiot.DeviceRegister) bool {
   if len(registerData.DeviceList) == 0 {
      return true
   }
   // 锁
   s.Lock()
   defer s.UnLock()
   // 检测黑名单
   if _,ok := s.ClusterBlackList[masterId];ok {
      return false
   }
   // 添加设备ID
   if s.ClusterDevice[masterId] == nil {
      s.ClusterDevice[masterId] = make(map[string]struct{})
   }
   for _, node := range registerData.DeviceList{
      s.ClusterDevice[masterId][node.DeviceId] = struct{}{}
      s.Devices[node.DeviceId] = struct{}{}
   }
   return true
}
// 处理设备注册
func (s *Server) RegisterDevice(msg *aiot.Protocol, cli *client.Client) {
func (s *Server) RegisterDevice(cli *client.Client, msg *aiot.Protocol) {
   // 设置节点ID
   cli.SetDeviceId(msg.SenderId)
   // 添加集群ID
   s.SetCluster(msg.SenderId, cli)
   // 设置集群
   s.SetDeviceList(msg.SenderId, msg.DeviceProto.DeviceIds)
   registerData := &aiot.DeviceRegister{}
   _ = json.Unmarshal(msg.Data, registerData)
   s.SetDeviceList(msg.SenderId, registerData)
}