zhangzengfei
2024-07-06 635f849d70ae31df42dab56dacf62727a313db28
修改比对为并发
1个文件已添加
3个文件已修改
227 ■■■■■ 已修改文件
config/config.go 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/db.go 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 97 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
serve/serve.go 105 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config/config.go
@@ -8,18 +8,19 @@
    "github.com/spf13/viper"
)
type database struct {
type mainConfig struct {
    ServePort   int    `mapstructure:"servePort"`
    WorkerNum   int    `mapstructure:"workerNum"`
    MysqlAddr   string `mapstructure:"mysqlAddr"`
    Username    string `mapstructure:"username"`
    Password    string `mapstructure:"password"`
    Database    string `mapstructure:"database"`
    PersonTable string `mapstructure:"personTable"`
    ServePort   int    `mapstructure:"servePort"`
}
var DbPersonCompInfo = &database{}
var MainConf = &mainConfig{}
type LogConfig struct {
type logConfig struct {
    Path       string `mapstructure:"path"`       //日志存储路径
    Level      int    `mapstructure:"level"`      //日志等级
    MaxSize    int    `mapstructure:"maxSize"`    //日志文件大小上限
@@ -27,7 +28,7 @@
    MaxAge     int    `mapstructure:"maxAge"`     //保留压缩包天数
}
var LogConf = &LogConfig{}
var LogConf = &logConfig{}
func Init() error {
    var err error
@@ -53,8 +54,12 @@
}
func read2Conf(v *viper.Viper) {
    v.UnmarshalKey("database", DbPersonCompInfo)
    v.UnmarshalKey("main", MainConf)
    v.UnmarshalKey("log", LogConf)
    logger.SetLevel(LogConf.Level)
    if MainConf.WorkerNum == 0 {
        MainConf.WorkerNum = 30
    }
}
db/db.go
@@ -16,10 +16,10 @@
    var err error
    dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4",
        config.DbPersonCompInfo.Username,
        config.DbPersonCompInfo.Password,
        config.DbPersonCompInfo.MysqlAddr,
        config.DbPersonCompInfo.Database)
        config.MainConf.Username,
        config.MainConf.Password,
        config.MainConf.MysqlAddr,
        config.MainConf.Database)
    db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
        // 禁用外键(指定外键时不会在mysql创建真实的外键约束)
main.go
@@ -3,24 +3,20 @@
import (
    "context"
    "flag"
    "os"
    "os/signal"
    "path"
    "strconv"
    "syscall"
    "time"
    "sdkCompare/cache"
    "sdkCompare/compare"
    "sdkCompare/config"
    "sdkCompare/db"
    "sdkCompare/proto/facecompare"
    "sdkCompare/serve"
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/logger.git"
    "basic.com/valib/version.git"
    "github.com/golang/protobuf/proto"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/rep"
    "nanomsg.org/go-mangos/transport/ipc"
    "nanomsg.org/go-mangos/transport/tcp"
    vaversion "basic.com/valib/version.git"
)
const procName = "faceCompare"
@@ -42,88 +38,25 @@
    logger.InitLogger(logFile, config.LogConf.Level, config.LogConf.MaxSize, config.LogConf.MaxBackups, config.LogConf.MaxAge)
    logger.Info("logger init success !")
    serveUrl := "tcp://0.0.0.0:"
    if err := db.ConnectDB(); err != nil {
        logger.Error(err.Error())
        return
    }
    cache.InitDbTablePersons()
    serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort)
    serveUrl := "tcp://0.0.0.0:"
    serveUrl = serveUrl + strconv.Itoa(config.MainConf.ServePort)
    logger.Infof("%s serve url:%s", procName, serveUrl)
    Recv(serveUrl)
}
    var ctx, cancel = context.WithCancel(context.Background())
func Recv(url string) {
    var sock mangos.Socket
    var err error
    var msg []byte
    var ctx, _ = context.WithCancel(context.Background())
    if sock, err = rep.NewSocket(); err != nil {
        logger.Error("new rep socket err:", err)
    }
    serve.Start(ctx, serveUrl, config.MainConf.WorkerNum)
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    if err = sock.Listen(url); err != nil {
        logger.Error("listen on rep socket err:", err)
    }
    for {
        select {
        case <-ctx.Done():
            logger.Info("ctx done")
            return
        default:
            msg, err = sock.Recv()
            if err != nil || len(msg) <= 0 {
                continue
            }
            var request facecompare.CompareRequest
            err = proto.Unmarshal(msg, &request)
            if err != nil {
                logger.Warn("CompareRequest json unmarshal error")
                continue
            }
            var result []byte
            if request.CompareType == facecompare.CompareType_Compare {
                var compareArgInfo protomsg.CompareArgs
                if err = proto.Unmarshal(request.Payload, &compareArgInfo); err == nil {
                    timeStart := time.Now()
                    result = compare.GetComparePersonBaseInfo(compareArgInfo)
                    logger.Debug("用时:", time.Since(timeStart))
                } else {
                    logger.Warn("CompareArgs or EsPersonCacheChange json unmarshal error")
                    continue
                }
            } else if request.CompareType == facecompare.CompareType_UpdateCache {
                var compareEvent protomsg.CompareEvent
                if err = proto.Unmarshal(request.Payload, &compareEvent); err == nil {
                    if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //加入集群后重新初始化缓存
                        cache.ReInitDbTablePersonsCache()
                    } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //库中新增更新缓存
                        id := string(compareEvent.Payload)
                        cache.UpdateDbPersonsCacheById(id)
                        logger.Info("--------------更新人员缓存, id: ", id)
                    } else if compareEvent.EventType == protomsg.CompareEventType_DeleteCache { //库中新增更新缓存
                        id := string(compareEvent.Payload)
                        cache.DeleteDbPersonsCacheById(id)
                        logger.Info("--------------删除人员缓存, id: ", id)
                    }
                } else {
                    logger.Warn("CompareEvent json unmarshal error")
                    continue
                }
            }
            err = sock.Send(result)
            if err != nil {
                logger.Warn("send reply err:", err.Error())
            }
        }
    }
    quit := make(chan os.Signal)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit
    logger.Info("Shutting down server...")
    cancel()
}
serve/serve.go
New file
@@ -0,0 +1,105 @@
package serve
import (
    "context"
    "sync"
    "time"
    "sdkCompare/cache"
    "sdkCompare/compare"
    "sdkCompare/proto/facecompare"
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/logger.git"
    "github.com/golang/protobuf/proto"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/rep"
    "nanomsg.org/go-mangos/transport/ipc"
    "nanomsg.org/go-mangos/transport/tcp"
)
func serverWorker(ctx context.Context, sock mangos.Socket, id int) {
    for {
        select {
        case <-ctx.Done():
            logger.Infof("worker %d done", id)
            return
        default:
            msg, err := sock.Recv()
            if err != nil || len(msg) <= 0 {
                continue
            }
            var request facecompare.CompareRequest
            err = proto.Unmarshal(msg, &request)
            if err != nil {
                logger.Warn("CompareRequest json unmarshal error")
                continue
            }
            var result []byte
            if request.CompareType == facecompare.CompareType_Compare {
                var compareArgInfo protomsg.CompareArgs
                if err = proto.Unmarshal(request.Payload, &compareArgInfo); err == nil {
                    timeStart := time.Now()
                    result = compare.GetComparePersonBaseInfo(compareArgInfo)
                    logger.Debug("用时:", time.Since(timeStart))
                } else {
                    logger.Warn("CompareArgs or EsPersonCacheChange json unmarshal error")
                    continue
                }
            } else if request.CompareType == facecompare.CompareType_UpdateCache {
                var compareEvent protomsg.CompareEvent
                if err = proto.Unmarshal(request.Payload, &compareEvent); err == nil {
                    if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //加入集群后重新初始化缓存
                        cache.ReInitDbTablePersonsCache()
                    } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //库中新增更新缓存
                        id := string(compareEvent.Payload)
                        cache.UpdateDbPersonsCacheById(id)
                        logger.Info("--------------更新人员缓存, id: ", id)
                    } else if compareEvent.EventType == protomsg.CompareEventType_DeleteCache { //库中新增更新缓存
                        id := string(compareEvent.Payload)
                        cache.DeleteDbPersonsCacheById(id)
                        logger.Info("--------------删除人员缓存, id: ", id)
                    }
                } else {
                    logger.Warn("CompareEvent json unmarshal error")
                    continue
                }
            }
            err = sock.Send(result)
            if err != nil {
                logger.Warn("send reply err:", err.Error())
            }
        }
    }
}
func Start(ctx context.Context, url string, nWorkers int) {
    var sock mangos.Socket
    var err error
    if sock, err = rep.NewSocket(); err != nil {
        logger.Error("can't get new rep socket: %s", err)
        return
    }
    if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
        logger.Error("can't set raw mode: %s", err)
        return
    }
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    if err = sock.Listen(url); err != nil {
        logger.Error("can't listen on rep socket: %s", err.Error())
        return
    }
    logger.Debugf("Starting %d workers", nWorkers)
    for id := 0; id < nWorkers; id++ {
        go func(id int) {
            serverWorker(ctx, sock, id)
        }(id)
    }
}