saas-smartAi通信协议标准库
gongshangguo
2022-02-28 160a425c85f128ed47f92aa15c7eb6f76d68610a
回复register
3个文件已修改
267 ■■■■ 已修改文件
server/callback.go 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/clienter.go 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/server.go 227 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/callback.go
@@ -7,18 +7,18 @@
// 回调接口
type ServerCallBack interface {
    // 收到心跳
    OnHeartBeat (c *client.Client, msg *aiot.Protocol)
    // 设备注册
    OnRegister (c *client.Client, msg *aiot.Protocol)
    // 收到请求
    OnRequest (c *client.Client, msg *aiot.Protocol)
    // 收到响应
    OnResponse (c *client.Client, msg *aiot.Protocol)
    // 设备注册
    OnRegister (c *client.Client, msg *aiot.Protocol)
    // 收到心跳
    OnHeartBeat (c *client.Client, msg *aiot.Protocol)
    // 通道关闭
    OnClose (c *client.Client)
    // 数据上报
    OnDataReport (c *client.Client, msg *aiot.Protocol)
    // 设备控制
    OnDeviceControl(c *client.Client, msg *aiot.Protocol)
    // 通道关闭
    OnClose (c *client.Client)
}
server/clienter.go
@@ -39,6 +39,16 @@
func (c *Clients) OnRegister(cli *client.Client, msg *aiot.Protocol) error {
    Srv.RegisterDevice(msg, cli)
    go Srv.serverCallBack.OnRegister(cli, msg)
    msgFeedBack := &aiot.Protocol{
        Receiver: aiot.RECEIVER_TO_MASTER,
        SenderId: Srv.serverId,
        DeviceProto: msg.DeviceProto,
        MsgType: aiot.MSG_TYPE_REGISTER,
        ReqType: aiot.REQ_TYPE_RESPONSE,
        MsgProto: cli.GetMsgProto(msg.MsgProto.MsgId),
        Data: msg.Data,
    }
    _ = cli.WriteBody(msgFeedBack)
    return nil
}
@@ -54,15 +64,6 @@
    return nil
}
// 实现OnClose
func (c *Clients) OnClose(cli *client.Client) {
    if Srv.IsMasterOnline(cli.GetDeviceId()) {
        Srv.RemoveCluster(cli.GetDeviceId())
    }
    go Srv.serverCallBack.OnClose(cli)
    return
}
// 实现OnDataReport
func (c *Clients) OnDataReport(cli *client.Client, msg *aiot.Protocol) error {
    go Srv.serverCallBack.OnDataReport(cli, msg)
@@ -74,3 +75,12 @@
    go Srv.serverCallBack.OnDeviceControl(cli, msg)
    return nil
}
// 实现OnClose
func (c *Clients) OnClose(cli *client.Client) {
    if Srv.IsMasterOnline(cli.GetDeviceId()) {
        Srv.RemoveCluster(cli.GetDeviceId())
    }
    go Srv.serverCallBack.OnClose(cli)
    return
}
server/server.go
@@ -46,6 +46,119 @@
// 全局服务
var Srv *Server
// 上锁
func (s *Server) Lock() {
    s.deviceLock.Lock()
}
// 解锁
func (s *Server) UnLock() {
    s.deviceLock.Unlock()
}
// 初始化服务
func NewServer(addr string, serverId string, serverCallBack ServerCallBack, logger *zap.SugaredLogger) *Server {
    logger.Debug("New server...", zap.String("addr", addr))
    return &Server{
        addr: addr,
        waitGroup: &util.WaitGroupWrapper{},
        serverId: serverId,
        deviceLock: new(sync.RWMutex),
        Clusters: make(map[string]*client.Client),
        Devices: make(map[string]struct{}),
        ClusterDevice: make(map[string]map[string]struct{}),
        ClusterBlackList: make(map[string]struct{}),
        serverCallBack: serverCallBack,
        Logger: logger,
    }
}
// 启动服务
func (s *Server) StartSrv() error {
    s.Logger.Debug("Start server...", zap.String("addr", s.addr))
    // 错误
    var err error
    // tcpAddr
    s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr)
    if err != nil {
        s.Logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err))
        return err
    }
    // 监听
    s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr)
    if err != nil {
        s.Logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err))
        return err
    }
    // 收到连接
    s.Logger.Debug("Starting to listen addr", zap.String("addr", s.addr))
    s.waitGroup.Wrap(func() {
        for {
            // 获取连接
            clientConn, err := s.tcpListener.Accept()
            if err != nil {
                // 让出grouting
                if netErr, ok := err.(net.Error);ok && netErr.Temporary() {
                    s.Logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err))
                    runtime.Gosched()
                    continue
                }
                // 不能使用已关闭的连接
                if !strings.Contains(err.Error(), "use of closed network connection") {
                    s.Logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err))
                }
                break
            }
            // 处理连接
            s.Logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
            go s.Handler(clientConn)
        }
    })
    // wait
    s.waitGroup.Wait()
    s.Logger.Warn("Tcp server exist", zap.String("addr", s.addr))
    return nil
}
// 处理连接
func (s *Server) Handler(clientConn net.Conn) {
    s.Logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
    // 临时ID
    tplClientId := uuid.NewV4().String()
    // 注册信息
    cliRegister := &aiot.DeviceRegister{}
    // 初始化连接
    cliCon := &Clients{}
    cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon, s.Logger)
    // 设置连接状态
    cli.SetState(client.StateConnected)
    cli.Conn = clientConn
    // 启用读写句柄
    cli.SetRWBuf()
    // wait
    cli.Wait()
    cli.Close()
}
// 集群是否注册
func (s *Server) IsMasterOnline(masterId string) bool {
    if _,ok := s.ClusterDevice[masterId];ok {
        return true
    }
    return false
}
// 通过masterId获取集群ID
func (s *Server) GetClusterIdByMasterId(masterId string) string {
    if clusterId,ok := s.ClusterMaster[masterId];ok {
@@ -69,41 +182,6 @@
    s.Lock()
    defer s.UnLock()
    s.ClusterMaster[masterId] = clusterId
}
// 集群是否注册
func (s *Server) IsMasterOnline(masterId string) bool {
    if _,ok := s.ClusterDevice[masterId];ok {
        return true
    }
    return false
}
// 初始化服务
func NewServer(addr string, serverId string, serverCallBack ServerCallBack, logger *zap.SugaredLogger) *Server {
    logger.Debug("New server...", zap.String("addr", addr))
    return &Server{
        addr: addr,
        waitGroup: &util.WaitGroupWrapper{},
        serverId: serverId,
        deviceLock: new(sync.RWMutex),
        Clusters: make(map[string]*client.Client),
        Devices: make(map[string]struct{}),
        ClusterDevice: make(map[string]map[string]struct{}),
        ClusterBlackList: make(map[string]struct{}),
        serverCallBack: serverCallBack,
        Logger: logger,
    }
}
// 上锁
func (s *Server) Lock() {
    s.deviceLock.Lock()
}
// 解锁
func (s *Server) UnLock() {
    s.deviceLock.Unlock()
}
// 注册集群ID
@@ -186,83 +264,6 @@
    return true
}
// 启动服务
func (s *Server) StartSrv() error {
    s.Logger.Debug("Start server...", zap.String("addr", s.addr))
    // 错误
    var err error
    // tcpAddr
    s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr)
    if err != nil {
        s.Logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err))
        return err
    }
    // 监听
    s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr)
    if err != nil {
        s.Logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err))
        return err
    }
    // 收到连接
    s.Logger.Debug("Starting to listen addr", zap.String("addr", s.addr))
    s.waitGroup.Wrap(func() {
        for {
            // 获取连接
            clientConn, err := s.tcpListener.Accept()
            if err != nil {
                // 让出grouting
                if netErr, ok := err.(net.Error);ok && netErr.Temporary() {
                    s.Logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err))
                    runtime.Gosched()
                    continue
                }
                // 不能使用已关闭的连接
                if !strings.Contains(err.Error(), "use of closed network connection") {
                    s.Logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err))
                }
                break
            }
            // 处理连接
            s.Logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
            go s.Handler(clientConn)
        }
    })
    // wait
    s.waitGroup.Wait()
    s.Logger.Warn("Tcp server exist", zap.String("addr", s.addr))
    return nil
}
// 处理连接
func (s *Server) Handler(clientConn net.Conn) {
    s.Logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
    // 临时ID
    tplClientId := uuid.NewV4().String()
    // 注册信息
    cliRegister := &aiot.DeviceRegister{}
    // 初始化连接
    cliCon := &Clients{}
    cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon, s.Logger)
    // 设置连接状态
    cli.SetState(client.StateConnected)
    cli.Conn = clientConn
    // 启用读写句柄
    cli.SetRWBuf()
    // wait
    cli.Wait()
}
// 处理设备注册
func (s *Server) RegisterDevice(msg *aiot.Protocol, cli *client.Client) {
    // 设置节点ID
@@ -271,6 +272,6 @@
    s.SetCluster(msg.SenderId, cli)
    // 设置集群
    registerData := &aiot.DeviceRegister{}
    json.Unmarshal(msg.Data, registerData)
    _ = json.Unmarshal(msg.Data, registerData)
    s.SetDeviceList(msg.SenderId, registerData)
}