From 635f849d70ae31df42dab56dacf62727a313db28 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期六, 06 七月 2024 20:17:04 +0800
Subject: [PATCH] 修改比对为并发

---
 config/config.go |   17 ++-
 serve/serve.go   |  105 ++++++++++++++++++++++++++
 main.go          |   97 +++--------------------
 db/db.go         |    8 +-
 4 files changed, 135 insertions(+), 92 deletions(-)

diff --git a/config/config.go b/config/config.go
index 5952d22..b71dc57 100644
--- a/config/config.go
+++ b/config/config.go
@@ -8,18 +8,19 @@
 	"github.com/spf13/viper"
 )
 
-type database struct {
+type mainConfig struct {
+	ServePort   int    `mapstructure:"servePort"`
+	WorkerNum   int    `mapstructure:"workerNum"`
 	MysqlAddr   string `mapstructure:"mysqlAddr"`
 	Username    string `mapstructure:"username"`
 	Password    string `mapstructure:"password"`
 	Database    string `mapstructure:"database"`
 	PersonTable string `mapstructure:"personTable"`
-	ServePort   int    `mapstructure:"servePort"`
 }
 
-var DbPersonCompInfo = &database{}
+var MainConf = &mainConfig{}
 
-type LogConfig struct {
+type logConfig struct {
 	Path       string `mapstructure:"path"`       //鏃ュ織瀛樺偍璺緞
 	Level      int    `mapstructure:"level"`      //鏃ュ織绛夌骇
 	MaxSize    int    `mapstructure:"maxSize"`    //鏃ュ織鏂囦欢澶у皬涓婇檺
@@ -27,7 +28,7 @@
 	MaxAge     int    `mapstructure:"maxAge"`     //淇濈暀鍘嬬缉鍖呭ぉ鏁�
 }
 
-var LogConf = &LogConfig{}
+var LogConf = &logConfig{}
 
 func Init() error {
 	var err error
@@ -53,8 +54,12 @@
 }
 
 func read2Conf(v *viper.Viper) {
-	v.UnmarshalKey("database", DbPersonCompInfo)
+	v.UnmarshalKey("main", MainConf)
 	v.UnmarshalKey("log", LogConf)
 
 	logger.SetLevel(LogConf.Level)
+
+	if MainConf.WorkerNum == 0 {
+		MainConf.WorkerNum = 30
+	}
 }
diff --git a/db/db.go b/db/db.go
index 2aa5dfd..875eee9 100644
--- a/db/db.go
+++ b/db/db.go
@@ -16,10 +16,10 @@
 	var err error
 
 	dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4",
-		config.DbPersonCompInfo.Username,
-		config.DbPersonCompInfo.Password,
-		config.DbPersonCompInfo.MysqlAddr,
-		config.DbPersonCompInfo.Database)
+		config.MainConf.Username,
+		config.MainConf.Password,
+		config.MainConf.MysqlAddr,
+		config.MainConf.Database)
 
 	db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
 		// 绂佺敤澶栭敭(鎸囧畾澶栭敭鏃朵笉浼氬湪mysql鍒涘缓鐪熷疄鐨勫閿害鏉�)
diff --git a/main.go b/main.go
index 67c5a08..f3137fd 100644
--- a/main.go
+++ b/main.go
@@ -3,24 +3,20 @@
 import (
 	"context"
 	"flag"
+	"os"
+	"os/signal"
 	"path"
 	"strconv"
+	"syscall"
 	"time"
 
 	"sdkCompare/cache"
-	"sdkCompare/compare"
 	"sdkCompare/config"
 	"sdkCompare/db"
-	"sdkCompare/proto/facecompare"
+	"sdkCompare/serve"
 
-	"basic.com/pubsub/protomsg.git"
 	"basic.com/valib/logger.git"
-	"basic.com/valib/version.git"
-	"github.com/golang/protobuf/proto"
-	"nanomsg.org/go-mangos"
-	"nanomsg.org/go-mangos/protocol/rep"
-	"nanomsg.org/go-mangos/transport/ipc"
-	"nanomsg.org/go-mangos/transport/tcp"
+	vaversion "basic.com/valib/version.git"
 )
 
 const procName = "faceCompare"
@@ -42,88 +38,25 @@
 	logger.InitLogger(logFile, config.LogConf.Level, config.LogConf.MaxSize, config.LogConf.MaxBackups, config.LogConf.MaxAge)
 	logger.Info("logger init success !")
 
-	serveUrl := "tcp://0.0.0.0:"
 	if err := db.ConnectDB(); err != nil {
 		logger.Error(err.Error())
 		return
 	}
 
 	cache.InitDbTablePersons()
-	serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort)
+
+	serveUrl := "tcp://0.0.0.0:"
+	serveUrl = serveUrl + strconv.Itoa(config.MainConf.ServePort)
 
 	logger.Infof("%s serve url:%s", procName, serveUrl)
 
-	Recv(serveUrl)
-}
+	var ctx, cancel = context.WithCancel(context.Background())
 
-func Recv(url string) {
-	var sock mangos.Socket
-	var err error
-	var msg []byte
-	var ctx, _ = context.WithCancel(context.Background())
-	if sock, err = rep.NewSocket(); err != nil {
-		logger.Error("new rep socket err:", err)
-	}
+	serve.Start(ctx, serveUrl, config.MainConf.WorkerNum)
 
-	sock.AddTransport(ipc.NewTransport())
-	sock.AddTransport(tcp.NewTransport())
-	if err = sock.Listen(url); err != nil {
-		logger.Error("listen on rep socket err:", err)
-	}
-
-	for {
-		select {
-		case <-ctx.Done():
-			logger.Info("ctx done")
-			return
-		default:
-			msg, err = sock.Recv()
-			if err != nil || len(msg) <= 0 {
-				continue
-			}
-
-			var request facecompare.CompareRequest
-			err = proto.Unmarshal(msg, &request)
-			if err != nil {
-				logger.Warn("CompareRequest json unmarshal error")
-				continue
-			}
-
-			var result []byte
-			if request.CompareType == facecompare.CompareType_Compare {
-				var compareArgInfo protomsg.CompareArgs
-				if err = proto.Unmarshal(request.Payload, &compareArgInfo); err == nil {
-					timeStart := time.Now()
-					result = compare.GetComparePersonBaseInfo(compareArgInfo)
-					logger.Debug("鐢ㄦ椂锛�", time.Since(timeStart))
-				} else {
-					logger.Warn("CompareArgs or EsPersonCacheChange json unmarshal error")
-					continue
-				}
-			} else if request.CompareType == facecompare.CompareType_UpdateCache {
-				var compareEvent protomsg.CompareEvent
-				if err = proto.Unmarshal(request.Payload, &compareEvent); err == nil {
-					if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //鍔犲叆闆嗙兢鍚庨噸鏂板垵濮嬪寲缂撳瓨
-						cache.ReInitDbTablePersonsCache()
-					} else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //搴撲腑鏂板鏇存柊缂撳瓨
-						id := string(compareEvent.Payload)
-						cache.UpdateDbPersonsCacheById(id)
-						logger.Info("--------------鏇存柊浜哄憳缂撳瓨, id: ", id)
-					} else if compareEvent.EventType == protomsg.CompareEventType_DeleteCache { //搴撲腑鏂板鏇存柊缂撳瓨
-						id := string(compareEvent.Payload)
-						cache.DeleteDbPersonsCacheById(id)
-						logger.Info("--------------鍒犻櫎浜哄憳缂撳瓨, id: ", id)
-					}
-				} else {
-					logger.Warn("CompareEvent json unmarshal error")
-					continue
-				}
-			}
-
-			err = sock.Send(result)
-			if err != nil {
-				logger.Warn("send reply err:", err.Error())
-			}
-		}
-	}
+	quit := make(chan os.Signal)
+	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
+	<-quit
+	logger.Info("Shutting down server...")
+	cancel()
 }
diff --git a/serve/serve.go b/serve/serve.go
new file mode 100644
index 0000000..933ec58
--- /dev/null
+++ b/serve/serve.go
@@ -0,0 +1,105 @@
+package serve
+
+import (
+	"context"
+	"sync"
+	"time"
+
+	"sdkCompare/cache"
+	"sdkCompare/compare"
+	"sdkCompare/proto/facecompare"
+
+	"basic.com/pubsub/protomsg.git"
+	"basic.com/valib/logger.git"
+	"github.com/golang/protobuf/proto"
+	"nanomsg.org/go-mangos"
+	"nanomsg.org/go-mangos/protocol/rep"
+	"nanomsg.org/go-mangos/transport/ipc"
+	"nanomsg.org/go-mangos/transport/tcp"
+)
+
+func serverWorker(ctx context.Context, sock mangos.Socket, id int) {
+	for {
+		select {
+		case <-ctx.Done():
+			logger.Infof("worker %d done", id)
+			return
+		default:
+			msg, err := sock.Recv()
+			if err != nil || len(msg) <= 0 {
+				continue
+			}
+
+			var request facecompare.CompareRequest
+			err = proto.Unmarshal(msg, &request)
+			if err != nil {
+				logger.Warn("CompareRequest json unmarshal error")
+				continue
+			}
+
+			var result []byte
+			if request.CompareType == facecompare.CompareType_Compare {
+				var compareArgInfo protomsg.CompareArgs
+				if err = proto.Unmarshal(request.Payload, &compareArgInfo); err == nil {
+					timeStart := time.Now()
+					result = compare.GetComparePersonBaseInfo(compareArgInfo)
+					logger.Debug("鐢ㄦ椂锛�", time.Since(timeStart))
+				} else {
+					logger.Warn("CompareArgs or EsPersonCacheChange json unmarshal error")
+					continue
+				}
+			} else if request.CompareType == facecompare.CompareType_UpdateCache {
+				var compareEvent protomsg.CompareEvent
+				if err = proto.Unmarshal(request.Payload, &compareEvent); err == nil {
+					if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //鍔犲叆闆嗙兢鍚庨噸鏂板垵濮嬪寲缂撳瓨
+						cache.ReInitDbTablePersonsCache()
+					} else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //搴撲腑鏂板鏇存柊缂撳瓨
+						id := string(compareEvent.Payload)
+						cache.UpdateDbPersonsCacheById(id)
+						logger.Info("--------------鏇存柊浜哄憳缂撳瓨, id: ", id)
+					} else if compareEvent.EventType == protomsg.CompareEventType_DeleteCache { //搴撲腑鏂板鏇存柊缂撳瓨
+						id := string(compareEvent.Payload)
+						cache.DeleteDbPersonsCacheById(id)
+						logger.Info("--------------鍒犻櫎浜哄憳缂撳瓨, id: ", id)
+					}
+				} else {
+					logger.Warn("CompareEvent json unmarshal error")
+					continue
+				}
+			}
+
+			err = sock.Send(result)
+			if err != nil {
+				logger.Warn("send reply err:", err.Error())
+			}
+		}
+	}
+}
+
+func Start(ctx context.Context, url string, nWorkers int) {
+	var sock mangos.Socket
+	var err error
+
+	if sock, err = rep.NewSocket(); err != nil {
+		logger.Error("can't get new rep socket: %s", err)
+		return
+	}
+	if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
+		logger.Error("can't set raw mode: %s", err)
+		return
+	}
+	sock.AddTransport(ipc.NewTransport())
+	sock.AddTransport(tcp.NewTransport())
+	if err = sock.Listen(url); err != nil {
+		logger.Error("can't listen on rep socket: %s", err.Error())
+		return
+	}
+
+	logger.Debugf("Starting %d workers", nWorkers)
+
+	for id := 0; id < nWorkers; id++ {
+		go func(id int) {
+			serverWorker(ctx, sock, id)
+		}(id)
+	}
+}

--
Gitblit v1.8.0