package server
|
|
import (
|
"basic.com/valib/go-aiot.git/aiotProto/aiot"
|
"basic.com/valib/go-aiot.git/client"
|
"basic.com/valib/go-aiot.git/util"
|
"encoding/json"
|
uuid "github.com/satori/go.uuid"
|
"go.uber.org/zap"
|
"net"
|
"runtime"
|
"strings"
|
"sync"
|
)
|
|
// 服务端结构体
|
type Server struct {
|
// 监听地址,格式:0.0.0.0:7081
|
addr string
|
// 携程库
|
waitGroup *util.WaitGroupWrapper
|
// tcpListener
|
tcpListener *net.TCPListener
|
// tcpAddr
|
tcpAddr *net.TCPAddr
|
// 服务端设备ID
|
serverId string
|
// 设备信息写入锁
|
deviceLock *sync.RWMutex
|
// 所有的集群ID列表 key: masterId
|
Clusters map[string]*client.Client
|
// 集群和主节点 key:masterId value:集群ID
|
ClusterMaster map[string]string
|
// 所有的设备连接池列表
|
Devices map[string]struct{}
|
// 以集群为单位设备列表 key0:masterId key1:nodeId
|
ClusterDevice map[string]map[string]struct{}
|
// 集群黑名单 key:masterId
|
ClusterBlackList map[string]struct{}
|
// 回调接口
|
serverCallBack ServerCallBack
|
// logger
|
Logger *zap.SugaredLogger
|
}
|
|
// 全局服务
|
var Srv *Server
|
|
// 上锁
|
func (s *Server) Lock() {
|
s.deviceLock.Lock()
|
}
|
|
// 解锁
|
func (s *Server) UnLock() {
|
s.deviceLock.Unlock()
|
}
|
|
// 初始化服务
|
func NewServer(addr string, serverId string, serverCallBack ServerCallBack, logger *zap.SugaredLogger) *Server {
|
logger.Debug("New server...", zap.String("addr", addr))
|
return &Server{
|
addr: addr,
|
waitGroup: &util.WaitGroupWrapper{},
|
serverId: serverId,
|
deviceLock: new(sync.RWMutex),
|
Clusters: make(map[string]*client.Client),
|
Devices: make(map[string]struct{}),
|
ClusterDevice: make(map[string]map[string]struct{}),
|
ClusterBlackList: make(map[string]struct{}),
|
serverCallBack: serverCallBack,
|
Logger: logger,
|
}
|
}
|
|
// 启动服务
|
func (s *Server) StartSrv() error {
|
s.Logger.Debug("Start server...", zap.String("addr", s.addr))
|
// 错误
|
var err error
|
// tcpAddr
|
s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr)
|
if err != nil {
|
s.Logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err))
|
return err
|
}
|
|
// 监听
|
s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr)
|
if err != nil {
|
s.Logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err))
|
return err
|
}
|
|
// 收到连接
|
s.Logger.Debug("Starting to listen addr", zap.String("addr", s.addr))
|
s.waitGroup.Wrap(func() {
|
for {
|
// 获取连接
|
clientConn, err := s.tcpListener.Accept()
|
if err != nil {
|
// 让出grouting
|
if netErr, ok := err.(net.Error);ok && netErr.Temporary() {
|
s.Logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err))
|
runtime.Gosched()
|
continue
|
}
|
|
// 不能使用已关闭的连接
|
if !strings.Contains(err.Error(), "use of closed network connection") {
|
s.Logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err))
|
}
|
break
|
}
|
|
// 处理连接
|
s.Logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
|
go s.Handler(clientConn)
|
}
|
})
|
|
// wait
|
s.waitGroup.Wait()
|
s.Logger.Warn("Tcp server exist", zap.String("addr", s.addr))
|
return nil
|
}
|
|
// 处理连接
|
func (s *Server) Handler(clientConn net.Conn) {
|
s.Logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
|
|
// 临时ID
|
tplClientId := uuid.NewV4().String()
|
|
// 注册信息
|
cliRegister := &aiot.DeviceRegister{}
|
|
// 初始化连接
|
cliCon := &Clients{}
|
cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon, s.Logger)
|
|
// 设置连接状态
|
cli.SetState(client.StateConnected)
|
cli.Conn = clientConn
|
|
// 启用读写句柄
|
cli.SetRWBuf()
|
|
// wait
|
cli.Wait()
|
cli.Close()
|
}
|
|
// 集群是否注册
|
func (s *Server) IsMasterOnline(masterId string) bool {
|
if _,ok := s.ClusterDevice[masterId];ok {
|
return true
|
}
|
return false
|
}
|
|
// 通过masterId获取集群ID
|
func (s *Server) GetClusterIdByMasterId(masterId string) string {
|
if clusterId,ok := s.ClusterMaster[masterId];ok {
|
return clusterId
|
}
|
return ""
|
}
|
|
// 通过集群ID获取masterId
|
func (s *Server) GetMasterIdByClusterId(clusterId string) string {
|
for masterId, cId := range s.ClusterMaster{
|
if cId == clusterId {
|
return masterId
|
}
|
}
|
return ""
|
}
|
|
// 绑定集群和master关系
|
func (s *Server) SetClusterIdMasterId(clusterId string, masterId string) {
|
s.Lock()
|
defer s.UnLock()
|
s.ClusterMaster[masterId] = clusterId
|
}
|
|
// 注册集群ID
|
func (s *Server) SetCluster(masterId string, cli *client.Client) bool {
|
if masterId == "" {
|
return true
|
}
|
|
// 检测黑名单
|
if _,ok := s.ClusterBlackList[masterId];ok {
|
return false
|
}
|
|
// 添加集群ID
|
s.Clusters[masterId] = cli
|
return true
|
}
|
|
// 删除集群
|
func (s *Server) RemoveCluster(masterId string) bool {
|
s.Lock()
|
defer s.UnLock()
|
// 关闭节点连接
|
if clusterDevice,ok := s.ClusterDevice[masterId];ok {
|
for deviceId := range clusterDevice{
|
// 移除集群中的设备
|
if _, ok := s.Devices[deviceId];ok {
|
delete(s.Devices, deviceId)
|
}
|
}
|
}
|
// 移除集群
|
if _,ok := s.Clusters[masterId];ok {
|
delete(s.Clusters, masterId)
|
return true
|
}
|
// 加入集群黑名单
|
s.ClusterBlackList[masterId] = struct{}{}
|
return false
|
}
|
|
// 删除设备
|
func (s *Server) RemoveDevice(deviceId string) {
|
// 如果是主节点
|
if _,ok := s.ClusterDevice[deviceId];ok {
|
s.RemoveCluster(deviceId)
|
}else{
|
// 删除当前节点
|
if _, ok := s.Devices[deviceId];ok {
|
s.Lock()
|
defer s.UnLock()
|
delete(s.Devices, deviceId)
|
}
|
}
|
}
|
|
// 注册设备信息
|
func (s *Server) SetDeviceList(masterId string, registerData *aiot.DeviceRegister) bool {
|
|
if len(registerData.DeviceList) == 0 {
|
return true
|
}
|
// 锁
|
s.Lock()
|
defer s.UnLock()
|
|
// 检测黑名单
|
if _,ok := s.ClusterBlackList[masterId];ok {
|
return false
|
}
|
|
// 添加设备ID
|
if s.ClusterDevice[masterId] == nil {
|
s.ClusterDevice[masterId] = make(map[string]struct{})
|
}
|
for _, node := range registerData.DeviceList{
|
s.ClusterDevice[masterId][node.DeviceId] = struct{}{}
|
s.Devices[node.DeviceId] = struct{}{}
|
}
|
return true
|
}
|
|
// 处理设备注册
|
func (s *Server) RegisterDevice(cli *client.Client, msg *aiot.Protocol) {
|
// 设置节点ID
|
cli.SetDeviceId(msg.SenderId)
|
// 添加集群ID
|
s.SetCluster(msg.SenderId, cli)
|
// 设置集群
|
registerData := &aiot.DeviceRegister{}
|
_ = json.Unmarshal(msg.Data, registerData)
|
s.SetDeviceList(msg.SenderId, registerData)
|
}
|