| | |
| | | package server |
| | | |
| | | import ( |
| | | "basic.com/valib/logger.git" |
| | | "basic.com/valib/go-aiot.git/aiotProto/aiot" |
| | | "basic.com/valib/go-aiot.git/client" |
| | | "basic.com/valib/go-aiot.git/util" |
| | | "encoding/json" |
| | | uuid "github.com/satori/go.uuid" |
| | | "go-aiot/aiotProto/aiot" |
| | | "go-aiot/client" |
| | | "go-aiot/util" |
| | | "go.uber.org/zap" |
| | | "net" |
| | | "runtime" |
| | |
| | | ClusterBlackList map[string]struct{} |
| | | // 回调接口 |
| | | serverCallBack ServerCallBack |
| | | // logger |
| | | Logger *zap.SugaredLogger |
| | | } |
| | | |
| | | // 全局服务 |
| | | var Srv *Server |
| | | |
| | | // 上锁 |
| | | func (s *Server) Lock() { |
| | | s.deviceLock.Lock() |
| | | } |
| | | |
| | | // 解锁 |
| | | func (s *Server) UnLock() { |
| | | s.deviceLock.Unlock() |
| | | } |
| | | |
| | | // 初始化服务 |
| | | func NewServer(addr string, serverId string, serverCallBack ServerCallBack, logger *zap.SugaredLogger) *Server { |
| | | logger.Debug("New server...", zap.String("addr", addr)) |
| | | return &Server{ |
| | | addr: addr, |
| | | waitGroup: &util.WaitGroupWrapper{}, |
| | | serverId: serverId, |
| | | deviceLock: new(sync.RWMutex), |
| | | Clusters: make(map[string]*client.Client), |
| | | Devices: make(map[string]struct{}), |
| | | ClusterDevice: make(map[string]map[string]struct{}), |
| | | ClusterBlackList: make(map[string]struct{}), |
| | | serverCallBack: serverCallBack, |
| | | Logger: logger, |
| | | } |
| | | } |
| | | |
| | | // 启动服务 |
| | | func (s *Server) StartSrv() error { |
| | | s.Logger.Debug("Start server...", zap.String("addr", s.addr)) |
| | | // 错误 |
| | | var err error |
| | | // tcpAddr |
| | | s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr) |
| | | if err != nil { |
| | | s.Logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err)) |
| | | return err |
| | | } |
| | | |
| | | // 监听 |
| | | s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr) |
| | | if err != nil { |
| | | s.Logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err)) |
| | | return err |
| | | } |
| | | |
| | | // 收到连接 |
| | | s.Logger.Debug("Starting to listen addr", zap.String("addr", s.addr)) |
| | | s.waitGroup.Wrap(func() { |
| | | for { |
| | | // 获取连接 |
| | | clientConn, err := s.tcpListener.Accept() |
| | | if err != nil { |
| | | // 让出grouting |
| | | if netErr, ok := err.(net.Error);ok && netErr.Temporary() { |
| | | s.Logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err)) |
| | | runtime.Gosched() |
| | | continue |
| | | } |
| | | |
| | | // 不能使用已关闭的连接 |
| | | if !strings.Contains(err.Error(), "use of closed network connection") { |
| | | s.Logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err)) |
| | | } |
| | | break |
| | | } |
| | | |
| | | // 处理连接 |
| | | s.Logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String())) |
| | | go s.Handler(clientConn) |
| | | } |
| | | }) |
| | | |
| | | // wait |
| | | s.waitGroup.Wait() |
| | | s.Logger.Warn("Tcp server exist", zap.String("addr", s.addr)) |
| | | return nil |
| | | } |
| | | |
| | | // 处理连接 |
| | | func (s *Server) Handler(clientConn net.Conn) { |
| | | s.Logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String())) |
| | | |
| | | // 临时ID |
| | | tplClientId := uuid.NewV4().String() |
| | | |
| | | // 注册信息 |
| | | cliRegister := &aiot.DeviceRegister{} |
| | | |
| | | // 初始化连接 |
| | | cliCon := &Clients{} |
| | | cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon, s.Logger) |
| | | |
| | | // 设置连接状态 |
| | | cli.SetState(client.StateConnected) |
| | | cli.Conn = clientConn |
| | | |
| | | // 启用读写句柄 |
| | | cli.SetRWBuf() |
| | | |
| | | // wait |
| | | cli.Wait() |
| | | cli.Close() |
| | | } |
| | | |
| | | // 集群是否注册 |
| | | func (s *Server) IsMasterOnline(masterId string) bool { |
| | | if _,ok := s.ClusterDevice[masterId];ok { |
| | | return true |
| | | } |
| | | return false |
| | | } |
| | | |
| | | // 通过masterId获取集群ID |
| | | func (s *Server) GetClusterIdByMasterId(masterId string) string { |
| | |
| | | s.Lock() |
| | | defer s.UnLock() |
| | | s.ClusterMaster[masterId] = clusterId |
| | | } |
| | | |
| | | // 集群是否注册 |
| | | func (s *Server) IsMasterOnline(masterId string) bool { |
| | | if _,ok := s.ClusterDevice[masterId];ok { |
| | | return true |
| | | } |
| | | return false |
| | | } |
| | | |
| | | // 初始化服务 |
| | | func NewServer(addr string, serverId string, serverCallBack ServerCallBack) *Server { |
| | | return &Server{ |
| | | addr: addr, |
| | | waitGroup: &util.WaitGroupWrapper{}, |
| | | serverId: serverId, |
| | | deviceLock: new(sync.RWMutex), |
| | | Clusters: make(map[string]*client.Client), |
| | | Devices: make(map[string]struct{}), |
| | | ClusterDevice: make(map[string]map[string]struct{}), |
| | | ClusterBlackList: make(map[string]struct{}), |
| | | serverCallBack: serverCallBack, |
| | | } |
| | | } |
| | | |
| | | // 上锁 |
| | | func (s *Server) Lock() { |
| | | s.deviceLock.Lock() |
| | | } |
| | | |
| | | // 解锁 |
| | | func (s *Server) UnLock() { |
| | | s.deviceLock.Unlock() |
| | | } |
| | | |
| | | // 注册集群ID |
| | |
| | | } |
| | | |
| | | // 注册设备信息 |
| | | func (s *Server) SetDeviceList(masterId string, nodeIds []string) bool { |
| | | if len(nodeIds) == 0 { |
| | | func (s *Server) SetDeviceList(masterId string, registerData *aiot.DeviceRegister) bool { |
| | | |
| | | if len(registerData.DeviceList) == 0 { |
| | | return true |
| | | } |
| | | // 锁 |
| | |
| | | } |
| | | |
| | | // 添加设备ID |
| | | for _, nodeId := range nodeIds{ |
| | | s.ClusterDevice[masterId][nodeId] = struct{}{} |
| | | s.Devices[nodeId] = struct{}{} |
| | | if s.ClusterDevice[masterId] == nil { |
| | | s.ClusterDevice[masterId] = make(map[string]struct{}) |
| | | } |
| | | for _, node := range registerData.DeviceList{ |
| | | s.ClusterDevice[masterId][node.DeviceId] = struct{}{} |
| | | s.Devices[node.DeviceId] = struct{}{} |
| | | } |
| | | return true |
| | | } |
| | | |
| | | // 启动服务 |
| | | func (s *Server) StartSrv() error { |
| | | // 错误 |
| | | var err error |
| | | // tcpAddr |
| | | s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr) |
| | | if err != nil { |
| | | logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err)) |
| | | return err |
| | | } |
| | | |
| | | // 监听 |
| | | s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr) |
| | | if err != nil { |
| | | logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err)) |
| | | return err |
| | | } |
| | | |
| | | // 收到连接 |
| | | logger.Debug("Starting to listen addr", zap.String("addr", s.addr)) |
| | | s.waitGroup.Wrap(func() { |
| | | for { |
| | | // 获取连接 |
| | | clientConn, err := s.tcpListener.Accept() |
| | | if err != nil { |
| | | // 让出grouting |
| | | if netErr, ok := err.(net.Error);ok && netErr.Temporary() { |
| | | logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err)) |
| | | runtime.Gosched() |
| | | continue |
| | | } |
| | | |
| | | // 不能使用已关闭的连接 |
| | | if !strings.Contains(err.Error(), "use of closed network connection") { |
| | | logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err)) |
| | | } |
| | | break |
| | | } |
| | | |
| | | // 处理连接 |
| | | logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String())) |
| | | go s.Handler(clientConn) |
| | | } |
| | | }) |
| | | |
| | | // wait |
| | | s.waitGroup.Wait() |
| | | logger.Warn("Tcp server exist", zap.String("addr", s.addr)) |
| | | return nil |
| | | } |
| | | |
| | | // 处理连接 |
| | | func (s *Server) Handler(clientConn net.Conn) { |
| | | logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String())) |
| | | |
| | | // 临时ID |
| | | tplClientId := uuid.NewV4().String() |
| | | |
| | | // 注册信息 |
| | | cliRegister := &aiot.DeviceRegister{} |
| | | |
| | | // 初始化连接 |
| | | cliCon := &Clients{} |
| | | cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon) |
| | | |
| | | // 设置连接状态 |
| | | cli.SetState(client.StateConnected) |
| | | cli.Conn = clientConn |
| | | |
| | | // 启用读写句柄 |
| | | cli.SetRWBuf() |
| | | |
| | | // wait |
| | | cli.Wait() |
| | | } |
| | | |
| | | // 处理设备注册 |
| | | func (s *Server) RegisterDevice(msg *aiot.Protocol, cli *client.Client) { |
| | | func (s *Server) RegisterDevice(cli *client.Client, msg *aiot.Protocol) { |
| | | // 设置节点ID |
| | | cli.SetDeviceId(msg.SenderId) |
| | | // 添加集群ID |
| | | s.SetCluster(msg.SenderId, cli) |
| | | // 设置集群 |
| | | s.SetDeviceList(msg.SenderId, msg.DeviceProto.DeviceIds) |
| | | registerData := &aiot.DeviceRegister{} |
| | | _ = json.Unmarshal(msg.Data, registerData) |
| | | s.SetDeviceList(msg.SenderId, registerData) |
| | | } |