From 635f849d70ae31df42dab56dacf62727a313db28 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期六, 06 七月 2024 20:17:04 +0800
Subject: [PATCH] 修改比对为并发
---
config/config.go | 17 ++-
serve/serve.go | 105 ++++++++++++++++++++++++++
main.go | 97 +++--------------------
db/db.go | 8 +-
4 files changed, 135 insertions(+), 92 deletions(-)
diff --git a/config/config.go b/config/config.go
index 5952d22..b71dc57 100644
--- a/config/config.go
+++ b/config/config.go
@@ -8,18 +8,19 @@
"github.com/spf13/viper"
)
-type database struct {
+type mainConfig struct {
+ ServePort int `mapstructure:"servePort"`
+ WorkerNum int `mapstructure:"workerNum"`
MysqlAddr string `mapstructure:"mysqlAddr"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Database string `mapstructure:"database"`
PersonTable string `mapstructure:"personTable"`
- ServePort int `mapstructure:"servePort"`
}
-var DbPersonCompInfo = &database{}
+var MainConf = &mainConfig{}
-type LogConfig struct {
+type logConfig struct {
Path string `mapstructure:"path"` //鏃ュ織瀛樺偍璺緞
Level int `mapstructure:"level"` //鏃ュ織绛夌骇
MaxSize int `mapstructure:"maxSize"` //鏃ュ織鏂囦欢澶у皬涓婇檺
@@ -27,7 +28,7 @@
MaxAge int `mapstructure:"maxAge"` //淇濈暀鍘嬬缉鍖呭ぉ鏁�
}
-var LogConf = &LogConfig{}
+var LogConf = &logConfig{}
func Init() error {
var err error
@@ -53,8 +54,12 @@
}
func read2Conf(v *viper.Viper) {
- v.UnmarshalKey("database", DbPersonCompInfo)
+ v.UnmarshalKey("main", MainConf)
v.UnmarshalKey("log", LogConf)
logger.SetLevel(LogConf.Level)
+
+ if MainConf.WorkerNum == 0 {
+ MainConf.WorkerNum = 30
+ }
}
diff --git a/db/db.go b/db/db.go
index 2aa5dfd..875eee9 100644
--- a/db/db.go
+++ b/db/db.go
@@ -16,10 +16,10 @@
var err error
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4",
- config.DbPersonCompInfo.Username,
- config.DbPersonCompInfo.Password,
- config.DbPersonCompInfo.MysqlAddr,
- config.DbPersonCompInfo.Database)
+ config.MainConf.Username,
+ config.MainConf.Password,
+ config.MainConf.MysqlAddr,
+ config.MainConf.Database)
db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
// 绂佺敤澶栭敭(鎸囧畾澶栭敭鏃朵笉浼氬湪mysql鍒涘缓鐪熷疄鐨勫閿害鏉�)
diff --git a/main.go b/main.go
index 67c5a08..f3137fd 100644
--- a/main.go
+++ b/main.go
@@ -3,24 +3,20 @@
import (
"context"
"flag"
+ "os"
+ "os/signal"
"path"
"strconv"
+ "syscall"
"time"
"sdkCompare/cache"
- "sdkCompare/compare"
"sdkCompare/config"
"sdkCompare/db"
- "sdkCompare/proto/facecompare"
+ "sdkCompare/serve"
- "basic.com/pubsub/protomsg.git"
"basic.com/valib/logger.git"
- "basic.com/valib/version.git"
- "github.com/golang/protobuf/proto"
- "nanomsg.org/go-mangos"
- "nanomsg.org/go-mangos/protocol/rep"
- "nanomsg.org/go-mangos/transport/ipc"
- "nanomsg.org/go-mangos/transport/tcp"
+ vaversion "basic.com/valib/version.git"
)
const procName = "faceCompare"
@@ -42,88 +38,25 @@
logger.InitLogger(logFile, config.LogConf.Level, config.LogConf.MaxSize, config.LogConf.MaxBackups, config.LogConf.MaxAge)
logger.Info("logger init success !")
- serveUrl := "tcp://0.0.0.0:"
if err := db.ConnectDB(); err != nil {
logger.Error(err.Error())
return
}
cache.InitDbTablePersons()
- serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort)
+
+ serveUrl := "tcp://0.0.0.0:"
+ serveUrl = serveUrl + strconv.Itoa(config.MainConf.ServePort)
logger.Infof("%s serve url:%s", procName, serveUrl)
- Recv(serveUrl)
-}
+ var ctx, cancel = context.WithCancel(context.Background())
-func Recv(url string) {
- var sock mangos.Socket
- var err error
- var msg []byte
- var ctx, _ = context.WithCancel(context.Background())
- if sock, err = rep.NewSocket(); err != nil {
- logger.Error("new rep socket err:", err)
- }
+ serve.Start(ctx, serveUrl, config.MainConf.WorkerNum)
- sock.AddTransport(ipc.NewTransport())
- sock.AddTransport(tcp.NewTransport())
- if err = sock.Listen(url); err != nil {
- logger.Error("listen on rep socket err:", err)
- }
-
- for {
- select {
- case <-ctx.Done():
- logger.Info("ctx done")
- return
- default:
- msg, err = sock.Recv()
- if err != nil || len(msg) <= 0 {
- continue
- }
-
- var request facecompare.CompareRequest
- err = proto.Unmarshal(msg, &request)
- if err != nil {
- logger.Warn("CompareRequest json unmarshal error")
- continue
- }
-
- var result []byte
- if request.CompareType == facecompare.CompareType_Compare {
- var compareArgInfo protomsg.CompareArgs
- if err = proto.Unmarshal(request.Payload, &compareArgInfo); err == nil {
- timeStart := time.Now()
- result = compare.GetComparePersonBaseInfo(compareArgInfo)
- logger.Debug("鐢ㄦ椂锛�", time.Since(timeStart))
- } else {
- logger.Warn("CompareArgs or EsPersonCacheChange json unmarshal error")
- continue
- }
- } else if request.CompareType == facecompare.CompareType_UpdateCache {
- var compareEvent protomsg.CompareEvent
- if err = proto.Unmarshal(request.Payload, &compareEvent); err == nil {
- if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //鍔犲叆闆嗙兢鍚庨噸鏂板垵濮嬪寲缂撳瓨
- cache.ReInitDbTablePersonsCache()
- } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //搴撲腑鏂板鏇存柊缂撳瓨
- id := string(compareEvent.Payload)
- cache.UpdateDbPersonsCacheById(id)
- logger.Info("--------------鏇存柊浜哄憳缂撳瓨, id: ", id)
- } else if compareEvent.EventType == protomsg.CompareEventType_DeleteCache { //搴撲腑鏂板鏇存柊缂撳瓨
- id := string(compareEvent.Payload)
- cache.DeleteDbPersonsCacheById(id)
- logger.Info("--------------鍒犻櫎浜哄憳缂撳瓨, id: ", id)
- }
- } else {
- logger.Warn("CompareEvent json unmarshal error")
- continue
- }
- }
-
- err = sock.Send(result)
- if err != nil {
- logger.Warn("send reply err:", err.Error())
- }
- }
- }
+ quit := make(chan os.Signal)
+ signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
+ <-quit
+ logger.Info("Shutting down server...")
+ cancel()
}
diff --git a/serve/serve.go b/serve/serve.go
new file mode 100644
index 0000000..933ec58
--- /dev/null
+++ b/serve/serve.go
@@ -0,0 +1,105 @@
+package serve
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "sdkCompare/cache"
+ "sdkCompare/compare"
+ "sdkCompare/proto/facecompare"
+
+ "basic.com/pubsub/protomsg.git"
+ "basic.com/valib/logger.git"
+ "github.com/golang/protobuf/proto"
+ "nanomsg.org/go-mangos"
+ "nanomsg.org/go-mangos/protocol/rep"
+ "nanomsg.org/go-mangos/transport/ipc"
+ "nanomsg.org/go-mangos/transport/tcp"
+)
+
+func serverWorker(ctx context.Context, sock mangos.Socket, id int) {
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Infof("worker %d done", id)
+ return
+ default:
+ msg, err := sock.Recv()
+ if err != nil || len(msg) <= 0 {
+ continue
+ }
+
+ var request facecompare.CompareRequest
+ err = proto.Unmarshal(msg, &request)
+ if err != nil {
+ logger.Warn("CompareRequest json unmarshal error")
+ continue
+ }
+
+ var result []byte
+ if request.CompareType == facecompare.CompareType_Compare {
+ var compareArgInfo protomsg.CompareArgs
+ if err = proto.Unmarshal(request.Payload, &compareArgInfo); err == nil {
+ timeStart := time.Now()
+ result = compare.GetComparePersonBaseInfo(compareArgInfo)
+ logger.Debug("鐢ㄦ椂锛�", time.Since(timeStart))
+ } else {
+ logger.Warn("CompareArgs or EsPersonCacheChange json unmarshal error")
+ continue
+ }
+ } else if request.CompareType == facecompare.CompareType_UpdateCache {
+ var compareEvent protomsg.CompareEvent
+ if err = proto.Unmarshal(request.Payload, &compareEvent); err == nil {
+ if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //鍔犲叆闆嗙兢鍚庨噸鏂板垵濮嬪寲缂撳瓨
+ cache.ReInitDbTablePersonsCache()
+ } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //搴撲腑鏂板鏇存柊缂撳瓨
+ id := string(compareEvent.Payload)
+ cache.UpdateDbPersonsCacheById(id)
+ logger.Info("--------------鏇存柊浜哄憳缂撳瓨, id: ", id)
+ } else if compareEvent.EventType == protomsg.CompareEventType_DeleteCache { //搴撲腑鏂板鏇存柊缂撳瓨
+ id := string(compareEvent.Payload)
+ cache.DeleteDbPersonsCacheById(id)
+ logger.Info("--------------鍒犻櫎浜哄憳缂撳瓨, id: ", id)
+ }
+ } else {
+ logger.Warn("CompareEvent json unmarshal error")
+ continue
+ }
+ }
+
+ err = sock.Send(result)
+ if err != nil {
+ logger.Warn("send reply err:", err.Error())
+ }
+ }
+ }
+}
+
+func Start(ctx context.Context, url string, nWorkers int) {
+ var sock mangos.Socket
+ var err error
+
+ if sock, err = rep.NewSocket(); err != nil {
+ logger.Error("can't get new rep socket: %s", err)
+ return
+ }
+ if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
+ logger.Error("can't set raw mode: %s", err)
+ return
+ }
+ sock.AddTransport(ipc.NewTransport())
+ sock.AddTransport(tcp.NewTransport())
+ if err = sock.Listen(url); err != nil {
+ logger.Error("can't listen on rep socket: %s", err.Error())
+ return
+ }
+
+ logger.Debugf("Starting %d workers", nWorkers)
+
+ for id := 0; id < nWorkers; id++ {
+ go func(id int) {
+ serverWorker(ctx, sock, id)
+ }(id)
+ }
+}
--
Gitblit v1.8.0