From 635f849d70ae31df42dab56dacf62727a313db28 Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期六, 06 七月 2024 20:17:04 +0800 Subject: [PATCH] 修改比对为并发 --- config/config.go | 17 ++- serve/serve.go | 105 ++++++++++++++++++++++++++ main.go | 97 +++-------------------- db/db.go | 8 +- 4 files changed, 135 insertions(+), 92 deletions(-) diff --git a/config/config.go b/config/config.go index 5952d22..b71dc57 100644 --- a/config/config.go +++ b/config/config.go @@ -8,18 +8,19 @@ "github.com/spf13/viper" ) -type database struct { +type mainConfig struct { + ServePort int `mapstructure:"servePort"` + WorkerNum int `mapstructure:"workerNum"` MysqlAddr string `mapstructure:"mysqlAddr"` Username string `mapstructure:"username"` Password string `mapstructure:"password"` Database string `mapstructure:"database"` PersonTable string `mapstructure:"personTable"` - ServePort int `mapstructure:"servePort"` } -var DbPersonCompInfo = &database{} +var MainConf = &mainConfig{} -type LogConfig struct { +type logConfig struct { Path string `mapstructure:"path"` //鏃ュ織瀛樺偍璺緞 Level int `mapstructure:"level"` //鏃ュ織绛夌骇 MaxSize int `mapstructure:"maxSize"` //鏃ュ織鏂囦欢澶у皬涓婇檺 @@ -27,7 +28,7 @@ MaxAge int `mapstructure:"maxAge"` //淇濈暀鍘嬬缉鍖呭ぉ鏁� } -var LogConf = &LogConfig{} +var LogConf = &logConfig{} func Init() error { var err error @@ -53,8 +54,12 @@ } func read2Conf(v *viper.Viper) { - v.UnmarshalKey("database", DbPersonCompInfo) + v.UnmarshalKey("main", MainConf) v.UnmarshalKey("log", LogConf) logger.SetLevel(LogConf.Level) + + if MainConf.WorkerNum == 0 { + MainConf.WorkerNum = 30 + } } diff --git a/db/db.go b/db/db.go index 2aa5dfd..875eee9 100644 --- a/db/db.go +++ b/db/db.go @@ -16,10 +16,10 @@ var err error dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4", - config.DbPersonCompInfo.Username, - config.DbPersonCompInfo.Password, - config.DbPersonCompInfo.MysqlAddr, - config.DbPersonCompInfo.Database) + config.MainConf.Username, + config.MainConf.Password, + config.MainConf.MysqlAddr, + config.MainConf.Database) db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{ // 绂佺敤澶栭敭(鎸囧畾澶栭敭鏃朵笉浼氬湪mysql鍒涘缓鐪熷疄鐨勫閿害鏉�) diff --git a/main.go b/main.go index 67c5a08..f3137fd 100644 --- a/main.go +++ b/main.go @@ -3,24 +3,20 @@ import ( "context" "flag" + "os" + "os/signal" "path" "strconv" + "syscall" "time" "sdkCompare/cache" - "sdkCompare/compare" "sdkCompare/config" "sdkCompare/db" - "sdkCompare/proto/facecompare" + "sdkCompare/serve" - "basic.com/pubsub/protomsg.git" "basic.com/valib/logger.git" - "basic.com/valib/version.git" - "github.com/golang/protobuf/proto" - "nanomsg.org/go-mangos" - "nanomsg.org/go-mangos/protocol/rep" - "nanomsg.org/go-mangos/transport/ipc" - "nanomsg.org/go-mangos/transport/tcp" + vaversion "basic.com/valib/version.git" ) const procName = "faceCompare" @@ -42,88 +38,25 @@ logger.InitLogger(logFile, config.LogConf.Level, config.LogConf.MaxSize, config.LogConf.MaxBackups, config.LogConf.MaxAge) logger.Info("logger init success !") - serveUrl := "tcp://0.0.0.0:" if err := db.ConnectDB(); err != nil { logger.Error(err.Error()) return } cache.InitDbTablePersons() - serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort) + + serveUrl := "tcp://0.0.0.0:" + serveUrl = serveUrl + strconv.Itoa(config.MainConf.ServePort) logger.Infof("%s serve url:%s", procName, serveUrl) - Recv(serveUrl) -} + var ctx, cancel = context.WithCancel(context.Background()) -func Recv(url string) { - var sock mangos.Socket - var err error - var msg []byte - var ctx, _ = context.WithCancel(context.Background()) - if sock, err = rep.NewSocket(); err != nil { - logger.Error("new rep socket err:", err) - } + serve.Start(ctx, serveUrl, config.MainConf.WorkerNum) - sock.AddTransport(ipc.NewTransport()) - sock.AddTransport(tcp.NewTransport()) - if err = sock.Listen(url); err != nil { - logger.Error("listen on rep socket err:", err) - } - - for { - select { - case <-ctx.Done(): - logger.Info("ctx done") - return - default: - msg, err = sock.Recv() - if err != nil || len(msg) <= 0 { - continue - } - - var request facecompare.CompareRequest - err = proto.Unmarshal(msg, &request) - if err != nil { - logger.Warn("CompareRequest json unmarshal error") - continue - } - - var result []byte - if request.CompareType == facecompare.CompareType_Compare { - var compareArgInfo protomsg.CompareArgs - if err = proto.Unmarshal(request.Payload, &compareArgInfo); err == nil { - timeStart := time.Now() - result = compare.GetComparePersonBaseInfo(compareArgInfo) - logger.Debug("鐢ㄦ椂锛�", time.Since(timeStart)) - } else { - logger.Warn("CompareArgs or EsPersonCacheChange json unmarshal error") - continue - } - } else if request.CompareType == facecompare.CompareType_UpdateCache { - var compareEvent protomsg.CompareEvent - if err = proto.Unmarshal(request.Payload, &compareEvent); err == nil { - if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //鍔犲叆闆嗙兢鍚庨噸鏂板垵濮嬪寲缂撳瓨 - cache.ReInitDbTablePersonsCache() - } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //搴撲腑鏂板鏇存柊缂撳瓨 - id := string(compareEvent.Payload) - cache.UpdateDbPersonsCacheById(id) - logger.Info("--------------鏇存柊浜哄憳缂撳瓨, id: ", id) - } else if compareEvent.EventType == protomsg.CompareEventType_DeleteCache { //搴撲腑鏂板鏇存柊缂撳瓨 - id := string(compareEvent.Payload) - cache.DeleteDbPersonsCacheById(id) - logger.Info("--------------鍒犻櫎浜哄憳缂撳瓨, id: ", id) - } - } else { - logger.Warn("CompareEvent json unmarshal error") - continue - } - } - - err = sock.Send(result) - if err != nil { - logger.Warn("send reply err:", err.Error()) - } - } - } + quit := make(chan os.Signal) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + <-quit + logger.Info("Shutting down server...") + cancel() } diff --git a/serve/serve.go b/serve/serve.go new file mode 100644 index 0000000..933ec58 --- /dev/null +++ b/serve/serve.go @@ -0,0 +1,105 @@ +package serve + +import ( + "context" + "sync" + "time" + + "sdkCompare/cache" + "sdkCompare/compare" + "sdkCompare/proto/facecompare" + + "basic.com/pubsub/protomsg.git" + "basic.com/valib/logger.git" + "github.com/golang/protobuf/proto" + "nanomsg.org/go-mangos" + "nanomsg.org/go-mangos/protocol/rep" + "nanomsg.org/go-mangos/transport/ipc" + "nanomsg.org/go-mangos/transport/tcp" +) + +func serverWorker(ctx context.Context, sock mangos.Socket, id int) { + for { + select { + case <-ctx.Done(): + logger.Infof("worker %d done", id) + return + default: + msg, err := sock.Recv() + if err != nil || len(msg) <= 0 { + continue + } + + var request facecompare.CompareRequest + err = proto.Unmarshal(msg, &request) + if err != nil { + logger.Warn("CompareRequest json unmarshal error") + continue + } + + var result []byte + if request.CompareType == facecompare.CompareType_Compare { + var compareArgInfo protomsg.CompareArgs + if err = proto.Unmarshal(request.Payload, &compareArgInfo); err == nil { + timeStart := time.Now() + result = compare.GetComparePersonBaseInfo(compareArgInfo) + logger.Debug("鐢ㄦ椂锛�", time.Since(timeStart)) + } else { + logger.Warn("CompareArgs or EsPersonCacheChange json unmarshal error") + continue + } + } else if request.CompareType == facecompare.CompareType_UpdateCache { + var compareEvent protomsg.CompareEvent + if err = proto.Unmarshal(request.Payload, &compareEvent); err == nil { + if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //鍔犲叆闆嗙兢鍚庨噸鏂板垵濮嬪寲缂撳瓨 + cache.ReInitDbTablePersonsCache() + } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //搴撲腑鏂板鏇存柊缂撳瓨 + id := string(compareEvent.Payload) + cache.UpdateDbPersonsCacheById(id) + logger.Info("--------------鏇存柊浜哄憳缂撳瓨, id: ", id) + } else if compareEvent.EventType == protomsg.CompareEventType_DeleteCache { //搴撲腑鏂板鏇存柊缂撳瓨 + id := string(compareEvent.Payload) + cache.DeleteDbPersonsCacheById(id) + logger.Info("--------------鍒犻櫎浜哄憳缂撳瓨, id: ", id) + } + } else { + logger.Warn("CompareEvent json unmarshal error") + continue + } + } + + err = sock.Send(result) + if err != nil { + logger.Warn("send reply err:", err.Error()) + } + } + } +} + +func Start(ctx context.Context, url string, nWorkers int) { + var sock mangos.Socket + var err error + + if sock, err = rep.NewSocket(); err != nil { + logger.Error("can't get new rep socket: %s", err) + return + } + if err = sock.SetOption(mangos.OptionRaw, true); err != nil { + logger.Error("can't set raw mode: %s", err) + return + } + sock.AddTransport(ipc.NewTransport()) + sock.AddTransport(tcp.NewTransport()) + if err = sock.Listen(url); err != nil { + logger.Error("can't listen on rep socket: %s", err.Error()) + return + } + + logger.Debugf("Starting %d workers", nWorkers) + + for id := 0; id < nWorkers; id++ { + go func(id int) { + serverWorker(ctx, sock, id) + }(id) + } +} -- Gitblit v1.8.0