saas-smartAi通信协议标准库
gongshangguo
2022-02-25 6d7dfff9e1920e55dfa4a7311f8e173d0e631fed
logger
3个文件已修改
85 ■■■■ 已修改文件
client/client.go 56 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/clienter.go 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/server.go 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
client/client.go
@@ -73,7 +73,7 @@
    // 心跳包
    heartBeatProto *aiot.HeartBeatProto
    // logger
    logger *zap.SugaredLogger
    Logger *zap.SugaredLogger
}
// 初始化客户端
@@ -92,7 +92,7 @@
        waitGroup: &util.WaitGroupWrapper{},
        clientCallback: callBack,
        heartBeatProto: &aiot.HeartBeatProto{},
        logger: logger,
        Logger: logger,
    }
}
@@ -117,35 +117,35 @@
// 启动服务
func (c *Client) StartSrv() {
    // 判断连接状态,避免重复连接
    c.logger.Debug("Start client service...")
    c.Logger.Debug("Start client service...")
    if c.IsConnected(){
        c.logger.Error("net is connected, please do not repeat connect", zap.String("addr", c.addr))
        c.Logger.Error("net is connected, please do not repeat connect", zap.String("addr", c.addr))
        return
    }
    // 地址是否可用
    if c.addr == "" {
        c.logger.Warn("net addr is nil, wait for retry", zap.String("deviceId", c.deviceId))
        c.Logger.Warn("net addr is nil, wait for retry", zap.String("deviceId", c.deviceId))
        return
    }
    // 连接TCP
    c.logger.Debug("Connecting to service", zap.String("addr", c.addr))
    c.Logger.Debug("Connecting to service", zap.String("addr", c.addr))
    tcpAddr, err := net.ResolveTCPAddr("tcp", c.addr)
    if err != nil {
        c.logger.Warn("Net addr can not be connect for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
        c.Logger.Warn("Net addr can not be connect for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
        return
    }
    conn, err := net.DialTCP("tcp", nil, tcpAddr)
    if err != nil {
        c.logger.Warn("Net addr can not be dial for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
        c.Logger.Warn("Net addr can not be dial for now,waiting...", zap.String("addr", c.addr), zap.Error(err))
        return
    }
    c.Conn = conn
    // 设置连接状态
    c.SetState(StateConnected)
    c.logger.Debug("Client service connected.", zap.String("addr", c.addr))
    c.Logger.Debug("Client service connected.", zap.String("addr", c.addr))
    // 启用读取通道
    c.SetRWBuf()
@@ -156,7 +156,7 @@
    // 启用心跳
    c.waitGroup.Wrap(c.writeHeartBeat)
    c.Wait()
    c.logger.Warn("Client service disconnected.Return...", zap.String("addr", c.addr))
    c.Logger.Warn("Client service disconnected.Return...", zap.String("addr", c.addr))
}
// 设置连接状态
@@ -190,23 +190,23 @@
    for {
        select {
        case <- c.exitChan:
            c.logger.Debug("Close client", zap.String("deviceId", c.deviceId))
            c.Logger.Debug("Close client", zap.String("deviceId", c.deviceId))
            c.Close()
            c.logger.Warn("writeLoop Done...")
            c.Logger.Warn("writeLoop Done...")
            return
        case bodyByte := <- c.writeChan:
            binary.BigEndian.PutUint32(byte4, uint32(len(bodyByte)))
            body = append(byte4, bodyByte...)
            _,err = c.Conn.Write(body)
            if err != nil {
                c.logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte)))
                c.Logger.Error("Fail to write message", zap.Error(err), zap.String("msg", string(bodyByte)))
                c.Close()
                c.logger.Warn("writeLoop Done...")
                c.Logger.Warn("writeLoop Done...")
                return
            }
            err = c.Writer.Flush()
            if err != nil {
                c.logger.Error("Fail to write flush", zap.Error(err))
                c.Logger.Error("Fail to write flush", zap.Error(err))
            }
        }
    }
@@ -220,7 +220,7 @@
    // 关闭的连接不能写入
    if c.IsClosed() {
        c.logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data)))
        c.Logger.Error("Can not write msg on the closed chan", zap.Any("msgType", msgType), zap.Any("reqType", reqType), zap.Any("data", string(data)))
        return nil,nil
    }
@@ -240,10 +240,10 @@
func (c *Client) WriteBody(body *aiot.Protocol) error {
    body.Receiver = aiot.RECEIVER_TO_SAAS
    c.logger.Debug("Send msg...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
    c.Logger.Debug("Send msg...", zap.Any("msg", body), zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
    msgData, err := json.Marshal(body)
    if err != nil {
        c.logger.Error("Fail to Marshal send data", zap.Error(err))
        c.Logger.Error("Fail to Marshal send data", zap.Error(err))
        return err
    }
    c.msgLock.Lock()
@@ -254,18 +254,18 @@
// 发送注册包
func (c *Client) writeRegister() {
    c.logger.Debug("registering...")
    c.Logger.Debug("registering...")
    data := c.deviceRegister
    msgData, _ := json.Marshal(data)
    _, err := c.WriteMsg(c.deviceId, aiot.MSG_TYPE_REGISTER, aiot.REQ_TYPE_REQUEST, msgData, c.GetMsgProto(""))
    if err != nil {
        c.logger.Error("Fail to send device register", zap.Any("msg", msgData))
        c.Logger.Error("Fail to send device register", zap.Any("msg", msgData))
    }
}
// 发送心跳包
func (c *Client) writeHeartBeat() {
    c.logger.Debug("Start HeartBeating...",zap.String("addr", c.addr))
    c.Logger.Debug("Start HeartBeating...",zap.String("addr", c.addr))
    pingData, _ := json.Marshal(c.heartBeatProto)
    t := time.NewTicker(DefaultHeartbeatInterval)
    defer func() {
@@ -298,7 +298,7 @@
            if err == io.EOF {
                err = nil
            } else {
                c.logger.Error("Fail to read request", zap.Error(err))
                c.Logger.Error("Fail to read request", zap.Error(err))
                c.Close()
                return
            }
@@ -306,7 +306,7 @@
        }
        length = binary.BigEndian.Uint32(c.tmpByte4Slice)
        if length > DefaultBufferSize {
            c.logger.Error("Fail to read request data from io", zap.Uint32("length",length))
            c.Logger.Error("Fail to read request data from io", zap.Uint32("length",length))
        }
        // 读取body
        bodyByte := make([]byte, length)
@@ -316,21 +316,21 @@
        body := &aiot.Protocol{}
        err = json.Unmarshal(bodyByte, body)
        if err != nil {
            c.logger.Error("Fail to unmarshal message", zap.Error(err), zap.String("msg", string(bodyByte)))
            c.Logger.Error("Fail to unmarshal message", zap.Error(err), zap.String("msg", string(bodyByte)))
        }
        c.logger.Debug("Read msg...", zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
        c.Logger.Debug("Read msg...", zap.Any("msg", body), zap.Any("reqType", body.ReqType), zap.Any("msgType", body.MsgType))
        // 处理回调
        c.onMessage(body)
    }
    c.logger.Warn("ReadLoop Done...")
    c.Logger.Warn("ReadLoop Done...")
}
// 处理回调
func (c *Client) onMessage (body *aiot.Protocol) {
    // 未封装callback,只写日志
    if c.clientCallback == nil {
        c.logger.Warn("ClientCallBack interface is nil...", zap.String("addr", c.addr), zap.Any("body", body))
        c.Logger.Warn("ClientCallBack interface is nil...", zap.String("addr", c.addr), zap.Any("body", body))
        return
    }
@@ -411,7 +411,7 @@
// 关闭TCP
func (c *Client) Close() {
    c.logger.Debug("Closing connect", zap.String("addr", c.addr))
    c.Logger.Debug("Closing connect", zap.String("addr", c.addr))
    c.closeLock.Lock()
    defer c.closeLock.Unlock()
server/clienter.go
@@ -3,7 +3,6 @@
import (
    "basic.com/valib/go-aiot.git/aiotProto/aiot"
    "basic.com/valib/go-aiot.git/client"
    "basic.com/valib/logger.git"
    "errors"
    "go.uber.org/zap"
)
@@ -19,7 +18,7 @@
        // 错误信息
        errMsg := "Cluster have not registered"
        // 记录日志
        logger.Error(errMsg, zap.Any("msg", msg))
        cli.Logger.Error(errMsg, zap.Any("msg", msg))
        cli.Close()
        // 移除集群
        Srv.RemoveCluster(msg.SenderId)
server/server.go
@@ -4,7 +4,6 @@
    "basic.com/valib/go-aiot.git/aiotProto/aiot"
    "basic.com/valib/go-aiot.git/client"
    "basic.com/valib/go-aiot.git/util"
    "basic.com/valib/logger.git"
    uuid "github.com/satori/go.uuid"
    "go.uber.org/zap"
    "net"
@@ -39,6 +38,8 @@
    ClusterBlackList map[string]struct{}
    // 回调接口
    serverCallBack ServerCallBack
    // logger
    Logger *zap.SugaredLogger
}
// 全局服务
@@ -78,7 +79,8 @@
}
// 初始化服务
func NewServer(addr string, serverId string, serverCallBack ServerCallBack) *Server {
func NewServer(addr string, serverId string, serverCallBack ServerCallBack, logger *zap.SugaredLogger) *Server {
    logger.Debug("New server...", zap.String("addr", addr))
    return &Server{
        addr: addr,
        waitGroup: &util.WaitGroupWrapper{},
@@ -89,6 +91,7 @@
        ClusterDevice: make(map[string]map[string]struct{}),
        ClusterBlackList: make(map[string]struct{}),
        serverCallBack: serverCallBack,
        Logger: logger,
    }
}
@@ -180,24 +183,25 @@
// 启动服务
func (s *Server) StartSrv() error {
    s.Logger.Debug("Start server...", zap.String("addr", s.addr))
    // 错误
    var err error
    // tcpAddr
    s.tcpAddr, err = net.ResolveTCPAddr("tcp", s.addr)
    if err != nil {
        logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err))
        s.Logger.Error("Can not build tcp server for now", zap.String("addr", s.addr), zap.Error(err))
        return err
    }
    // 监听
    s.tcpListener, err = net.ListenTCP("tcp", s.tcpAddr)
    if err != nil {
        logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err))
        s.Logger.Error("Can not listen tcp server for now", zap.String("addr", s.addr), zap.Error(err))
        return err
    }
    // 收到连接
    logger.Debug("Starting to listen addr", zap.String("addr", s.addr))
    s.Logger.Debug("Starting to listen addr", zap.String("addr", s.addr))
    s.waitGroup.Wrap(func() {
        for {
            // 获取连接
@@ -205,33 +209,33 @@
            if err != nil {
                // 让出grouting
                if netErr, ok := err.(net.Error);ok && netErr.Temporary() {
                    logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err))
                    s.Logger.Error("Continue listening...", zap.String("addr", s.tcpListener.Addr().String()), zap.Error(err))
                    runtime.Gosched()
                    continue
                }
                // 不能使用已关闭的连接
                if !strings.Contains(err.Error(), "use of closed network connection") {
                    logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err))
                    s.Logger.Error("Can not use a closed network connection", zap.String("addr", s.addr), zap.Error(err))
                }
                break
            }
            // 处理连接
            logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
            s.Logger.Debug("Client connected", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
            go s.Handler(clientConn)
        }
    })
    // wait
    s.waitGroup.Wait()
    logger.Warn("Tcp server exist", zap.String("addr", s.addr))
    s.Logger.Warn("Tcp server exist", zap.String("addr", s.addr))
    return nil
}
// 处理连接
func (s *Server) Handler(clientConn net.Conn) {
    logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
    s.Logger.Debug("Client connected...", zap.String("RemoteAddr", clientConn.RemoteAddr().String()), zap.String("LocalAddr", clientConn.LocalAddr().String()))
    // 临时ID
    tplClientId := uuid.NewV4().String()
@@ -241,7 +245,7 @@
    // 初始化连接
    cliCon := &Clients{}
    cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon)
    cli := client.NewClient(s.addr, tplClientId,cliRegister, cliCon, s.Logger)
    // 设置连接状态
    cli.SetState(client.StateConnected)