config/config.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
db/db.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
main.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
serve/serve.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
config/config.go
@@ -8,18 +8,19 @@ "github.com/spf13/viper" ) type database struct { type mainConfig struct { ServePort int `mapstructure:"servePort"` WorkerNum int `mapstructure:"workerNum"` MysqlAddr string `mapstructure:"mysqlAddr"` Username string `mapstructure:"username"` Password string `mapstructure:"password"` Database string `mapstructure:"database"` PersonTable string `mapstructure:"personTable"` ServePort int `mapstructure:"servePort"` } var DbPersonCompInfo = &database{} var MainConf = &mainConfig{} type LogConfig struct { type logConfig struct { Path string `mapstructure:"path"` //日志存储路径 Level int `mapstructure:"level"` //日志等级 MaxSize int `mapstructure:"maxSize"` //日志文件大小上限 @@ -27,7 +28,7 @@ MaxAge int `mapstructure:"maxAge"` //保留压缩包天数 } var LogConf = &LogConfig{} var LogConf = &logConfig{} func Init() error { var err error @@ -53,8 +54,12 @@ } func read2Conf(v *viper.Viper) { v.UnmarshalKey("database", DbPersonCompInfo) v.UnmarshalKey("main", MainConf) v.UnmarshalKey("log", LogConf) logger.SetLevel(LogConf.Level) if MainConf.WorkerNum == 0 { MainConf.WorkerNum = 30 } } db/db.go
@@ -16,10 +16,10 @@ var err error dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4", config.DbPersonCompInfo.Username, config.DbPersonCompInfo.Password, config.DbPersonCompInfo.MysqlAddr, config.DbPersonCompInfo.Database) config.MainConf.Username, config.MainConf.Password, config.MainConf.MysqlAddr, config.MainConf.Database) db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{ // 禁用外键(指定外键时不会在mysql创建真实的外键约束) main.go
@@ -3,24 +3,20 @@ import ( "context" "flag" "os" "os/signal" "path" "strconv" "syscall" "time" "sdkCompare/cache" "sdkCompare/compare" "sdkCompare/config" "sdkCompare/db" "sdkCompare/proto/facecompare" "sdkCompare/serve" "basic.com/pubsub/protomsg.git" "basic.com/valib/logger.git" "basic.com/valib/version.git" "github.com/golang/protobuf/proto" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/rep" "nanomsg.org/go-mangos/transport/ipc" "nanomsg.org/go-mangos/transport/tcp" vaversion "basic.com/valib/version.git" ) const procName = "faceCompare" @@ -42,88 +38,25 @@ logger.InitLogger(logFile, config.LogConf.Level, config.LogConf.MaxSize, config.LogConf.MaxBackups, config.LogConf.MaxAge) logger.Info("logger init success !") serveUrl := "tcp://0.0.0.0:" if err := db.ConnectDB(); err != nil { logger.Error(err.Error()) return } cache.InitDbTablePersons() serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort) serveUrl := "tcp://0.0.0.0:" serveUrl = serveUrl + strconv.Itoa(config.MainConf.ServePort) logger.Infof("%s serve url:%s", procName, serveUrl) Recv(serveUrl) } var ctx, cancel = context.WithCancel(context.Background()) func Recv(url string) { var sock mangos.Socket var err error var msg []byte var ctx, _ = context.WithCancel(context.Background()) if sock, err = rep.NewSocket(); err != nil { logger.Error("new rep socket err:", err) } serve.Start(ctx, serveUrl, config.MainConf.WorkerNum) sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) if err = sock.Listen(url); err != nil { logger.Error("listen on rep socket err:", err) } for { select { case <-ctx.Done(): logger.Info("ctx done") return default: msg, err = sock.Recv() if err != nil || len(msg) <= 0 { continue } var request facecompare.CompareRequest err = proto.Unmarshal(msg, &request) if err != nil { logger.Warn("CompareRequest json unmarshal error") continue } var result []byte if request.CompareType == facecompare.CompareType_Compare { var compareArgInfo protomsg.CompareArgs if err = proto.Unmarshal(request.Payload, &compareArgInfo); err == nil { timeStart := time.Now() result = compare.GetComparePersonBaseInfo(compareArgInfo) logger.Debug("用时:", time.Since(timeStart)) } else { logger.Warn("CompareArgs or EsPersonCacheChange json unmarshal error") continue } } else if request.CompareType == facecompare.CompareType_UpdateCache { var compareEvent protomsg.CompareEvent if err = proto.Unmarshal(request.Payload, &compareEvent); err == nil { if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //加入集群后重新初始化缓存 cache.ReInitDbTablePersonsCache() } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //库中新增更新缓存 id := string(compareEvent.Payload) cache.UpdateDbPersonsCacheById(id) logger.Info("--------------更新人员缓存, id: ", id) } else if compareEvent.EventType == protomsg.CompareEventType_DeleteCache { //库中新增更新缓存 id := string(compareEvent.Payload) cache.DeleteDbPersonsCacheById(id) logger.Info("--------------删除人员缓存, id: ", id) } } else { logger.Warn("CompareEvent json unmarshal error") continue } } err = sock.Send(result) if err != nil { logger.Warn("send reply err:", err.Error()) } } } quit := make(chan os.Signal) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit logger.Info("Shutting down server...") cancel() } serve/serve.go
New file @@ -0,0 +1,105 @@ package serve import ( "context" "sync" "time" "sdkCompare/cache" "sdkCompare/compare" "sdkCompare/proto/facecompare" "basic.com/pubsub/protomsg.git" "basic.com/valib/logger.git" "github.com/golang/protobuf/proto" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/rep" "nanomsg.org/go-mangos/transport/ipc" "nanomsg.org/go-mangos/transport/tcp" ) func serverWorker(ctx context.Context, sock mangos.Socket, id int) { for { select { case <-ctx.Done(): logger.Infof("worker %d done", id) return default: msg, err := sock.Recv() if err != nil || len(msg) <= 0 { continue } var request facecompare.CompareRequest err = proto.Unmarshal(msg, &request) if err != nil { logger.Warn("CompareRequest json unmarshal error") continue } var result []byte if request.CompareType == facecompare.CompareType_Compare { var compareArgInfo protomsg.CompareArgs if err = proto.Unmarshal(request.Payload, &compareArgInfo); err == nil { timeStart := time.Now() result = compare.GetComparePersonBaseInfo(compareArgInfo) logger.Debug("用时:", time.Since(timeStart)) } else { logger.Warn("CompareArgs or EsPersonCacheChange json unmarshal error") continue } } else if request.CompareType == facecompare.CompareType_UpdateCache { var compareEvent protomsg.CompareEvent if err = proto.Unmarshal(request.Payload, &compareEvent); err == nil { if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //加入集群后重新初始化缓存 cache.ReInitDbTablePersonsCache() } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //库中新增更新缓存 id := string(compareEvent.Payload) cache.UpdateDbPersonsCacheById(id) logger.Info("--------------更新人员缓存, id: ", id) } else if compareEvent.EventType == protomsg.CompareEventType_DeleteCache { //库中新增更新缓存 id := string(compareEvent.Payload) cache.DeleteDbPersonsCacheById(id) logger.Info("--------------删除人员缓存, id: ", id) } } else { logger.Warn("CompareEvent json unmarshal error") continue } } err = sock.Send(result) if err != nil { logger.Warn("send reply err:", err.Error()) } } } } func Start(ctx context.Context, url string, nWorkers int) { var sock mangos.Socket var err error if sock, err = rep.NewSocket(); err != nil { logger.Error("can't get new rep socket: %s", err) return } if err = sock.SetOption(mangos.OptionRaw, true); err != nil { logger.Error("can't set raw mode: %s", err) return } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) if err = sock.Listen(url); err != nil { logger.Error("can't listen on rep socket: %s", err.Error()) return } logger.Debugf("Starting %d workers", nWorkers) for id := 0; id < nWorkers; id++ { go func(id int) { serverWorker(ctx, sock, id) }(id) } }