From 635f849d70ae31df42dab56dacf62727a313db28 Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期六, 06 七月 2024 20:17:04 +0800 Subject: [PATCH] 修改比对为并发 --- main.go | 97 +++++++----------------------------------------- 1 files changed, 15 insertions(+), 82 deletions(-) diff --git a/main.go b/main.go index 67c5a08..f3137fd 100644 --- a/main.go +++ b/main.go @@ -3,24 +3,20 @@ import ( "context" "flag" + "os" + "os/signal" "path" "strconv" + "syscall" "time" "sdkCompare/cache" - "sdkCompare/compare" "sdkCompare/config" "sdkCompare/db" - "sdkCompare/proto/facecompare" + "sdkCompare/serve" - "basic.com/pubsub/protomsg.git" "basic.com/valib/logger.git" - "basic.com/valib/version.git" - "github.com/golang/protobuf/proto" - "nanomsg.org/go-mangos" - "nanomsg.org/go-mangos/protocol/rep" - "nanomsg.org/go-mangos/transport/ipc" - "nanomsg.org/go-mangos/transport/tcp" + vaversion "basic.com/valib/version.git" ) const procName = "faceCompare" @@ -42,88 +38,25 @@ logger.InitLogger(logFile, config.LogConf.Level, config.LogConf.MaxSize, config.LogConf.MaxBackups, config.LogConf.MaxAge) logger.Info("logger init success !") - serveUrl := "tcp://0.0.0.0:" if err := db.ConnectDB(); err != nil { logger.Error(err.Error()) return } cache.InitDbTablePersons() - serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort) + + serveUrl := "tcp://0.0.0.0:" + serveUrl = serveUrl + strconv.Itoa(config.MainConf.ServePort) logger.Infof("%s serve url:%s", procName, serveUrl) - Recv(serveUrl) -} + var ctx, cancel = context.WithCancel(context.Background()) -func Recv(url string) { - var sock mangos.Socket - var err error - var msg []byte - var ctx, _ = context.WithCancel(context.Background()) - if sock, err = rep.NewSocket(); err != nil { - logger.Error("new rep socket err:", err) - } + serve.Start(ctx, serveUrl, config.MainConf.WorkerNum) - sock.AddTransport(ipc.NewTransport()) - sock.AddTransport(tcp.NewTransport()) - if err = sock.Listen(url); err != nil { - logger.Error("listen on rep socket err:", err) - } - - for { - select { - case <-ctx.Done(): - logger.Info("ctx done") - return - default: - msg, err = sock.Recv() - if err != nil || len(msg) <= 0 { - continue - } - - var request facecompare.CompareRequest - err = proto.Unmarshal(msg, &request) - if err != nil { - logger.Warn("CompareRequest json unmarshal error") - continue - } - - var result []byte - if request.CompareType == facecompare.CompareType_Compare { - var compareArgInfo protomsg.CompareArgs - if err = proto.Unmarshal(request.Payload, &compareArgInfo); err == nil { - timeStart := time.Now() - result = compare.GetComparePersonBaseInfo(compareArgInfo) - logger.Debug("鐢ㄦ椂锛�", time.Since(timeStart)) - } else { - logger.Warn("CompareArgs or EsPersonCacheChange json unmarshal error") - continue - } - } else if request.CompareType == facecompare.CompareType_UpdateCache { - var compareEvent protomsg.CompareEvent - if err = proto.Unmarshal(request.Payload, &compareEvent); err == nil { - if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //鍔犲叆闆嗙兢鍚庨噸鏂板垵濮嬪寲缂撳瓨 - cache.ReInitDbTablePersonsCache() - } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //搴撲腑鏂板鏇存柊缂撳瓨 - id := string(compareEvent.Payload) - cache.UpdateDbPersonsCacheById(id) - logger.Info("--------------鏇存柊浜哄憳缂撳瓨, id: ", id) - } else if compareEvent.EventType == protomsg.CompareEventType_DeleteCache { //搴撲腑鏂板鏇存柊缂撳瓨 - id := string(compareEvent.Payload) - cache.DeleteDbPersonsCacheById(id) - logger.Info("--------------鍒犻櫎浜哄憳缂撳瓨, id: ", id) - } - } else { - logger.Warn("CompareEvent json unmarshal error") - continue - } - } - - err = sock.Send(result) - if err != nil { - logger.Warn("send reply err:", err.Error()) - } - } - } + quit := make(chan os.Signal) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + <-quit + logger.Info("Shutting down server...") + cancel() } -- Gitblit v1.8.0