| | |
| | | "context" |
| | | "flag" |
| | | "os" |
| | | "os/signal" |
| | | "path" |
| | | "path/filepath" |
| | | "strconv" |
| | | "time" |
| | | "syscall" |
| | | |
| | | "sdkCompare/cache" |
| | | "sdkCompare/config" |
| | | "sdkCompare/db" |
| | | "sdkCompare/serve" |
| | | |
| | | "basic.com/pubsub/protomsg.git" |
| | | "basic.com/valib/logger.git" |
| | | "basic.com/valib/version.git" |
| | | "github.com/golang/protobuf/proto" |
| | | "nanomsg.org/go-mangos" |
| | | "nanomsg.org/go-mangos/protocol/rep" |
| | | "nanomsg.org/go-mangos/transport/ipc" |
| | | "nanomsg.org/go-mangos/transport/tcp" |
| | | vaversion "basic.com/valib/version.git" |
| | | ) |
| | | |
| | | var ( |
| | | envirment string |
| | | procName string |
| | | targetType1 string |
| | | ) |
| | | const procName = "faceCompare" |
| | | |
| | | func init() { |
| | | flag.StringVar(&envirment, "e", "config", "") |
| | | |
| | | flag.StringVar(&targetType1, "targetType1", "FaceDetect", "cache feature") |
| | | |
| | | flag.Parse() |
| | | vaversion.Usage() |
| | | } |
| | | |
| | | config.Init(envirment) |
| | | func main() { |
| | | err := config.Init() |
| | | if err != nil { |
| | | return |
| | | } |
| | | |
| | | procName = filepath.Base(os.Args[0]) |
| | | var logFile = path.Join(config.LogConf.Path, procName+".log") |
| | | var logFile = path.Join(config.LogConf.Path, "faceCompare.log") |
| | | |
| | | // 日志初始化 |
| | | logger.InitLogger(logFile, config.LogConf.Level, config.LogConf.MaxSize, config.LogConf.MaxBackups, config.LogConf.MaxAge) |
| | | logger.Info("logger init success !") |
| | | } |
| | | |
| | | func main() { |
| | | //esutil.InitLog(logger.Debug) |
| | | logger.Debug("This is a new server about sdk compare, proc name ", procName) |
| | | if err := db.ConnectDB(); err != nil { |
| | | logger.Error(err.Error()) |
| | | return |
| | | } |
| | | |
| | | cache.InitCache() |
| | | |
| | | serveUrl := "tcp://0.0.0.0:" |
| | | if procName == "dbCompare" { |
| | | if err := cache.ConnectDB(); err != nil { |
| | | logger.Error(err.Error()) |
| | | return |
| | | } |
| | | serveUrl = serveUrl + strconv.Itoa(config.MainConf.ServePort) |
| | | |
| | | cache.InitDbTablePersons() |
| | | if !cache.InitCompare() { |
| | | logger.Debug("init SDKFace return false,panic") |
| | | return |
| | | } |
| | | serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort) |
| | | } else { |
| | | if err := cache.Init(config.EsInfo.EsIndex.AiOcean.IndexName, targetType1); err != nil { |
| | | logger.Info("init capture cache err:", err) |
| | | return |
| | | } |
| | | logger.Infof("%s serve url:%s", procName, serveUrl) |
| | | |
| | | if !cache.InitCompare() { |
| | | logger.Debug("init SDKFace return false,panic") |
| | | return |
| | | } |
| | | var ctx, cancel = context.WithCancel(context.Background()) |
| | | serve.Start(ctx, serveUrl, config.MainConf.WorkerNum) |
| | | |
| | | go cache.IncreVideoPersonsCache(time.Now(), targetType1) |
| | | serveUrl = serveUrl + strconv.Itoa(config.EsCompServerInfo.ServePort) |
| | | } |
| | | |
| | | logger.Debugf("%s serve url:%s", procName, serveUrl) |
| | | |
| | | Recv(serveUrl) |
| | | } |
| | | |
| | | func Recv(url string) { |
| | | var sock mangos.Socket |
| | | var err error |
| | | var msg []byte |
| | | var ctx, _ = context.WithCancel(context.Background()) |
| | | if sock, err = rep.NewSocket(); err != nil { |
| | | logger.Debug("new rep socket err:", err) |
| | | } |
| | | sock.AddTransport(ipc.NewTransport()) |
| | | sock.AddTransport(tcp.NewTransport()) |
| | | if err = sock.Listen(url); err != nil { |
| | | logger.Debug("listen on rep socket err:", err) |
| | | } |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | logger.Debug("ctx done") |
| | | return |
| | | default: |
| | | msg, err = sock.Recv() |
| | | if err != nil { |
| | | continue |
| | | } |
| | | |
| | | if len(msg) > 0 { |
| | | var compareArgInfo protomsg.CompareArgs |
| | | var cacheChangeInfo protomsg.EsPersonCacheChange |
| | | var compareEvent protomsg.CompareEvent |
| | | if err = proto.Unmarshal(msg, &compareArgInfo); err == nil { |
| | | timeStart := time.Now() |
| | | result := cache.GetComparePersonBaseInfo(compareArgInfo) |
| | | logger.Debug("用时:", time.Since(timeStart)) |
| | | err = sock.Send(result) |
| | | if err != nil { |
| | | logger.Debug("send reply err:", err.Error()) |
| | | } |
| | | |
| | | } else if err = proto.Unmarshal(msg, &cacheChangeInfo); err == nil { |
| | | cache.UpdateCache(&cacheChangeInfo) |
| | | err = sock.Send([]byte("")) |
| | | if err != nil { |
| | | logger.Debug("send reply err:", err.Error()) |
| | | } |
| | | } else if err = proto.Unmarshal(msg, &compareEvent); err == nil { |
| | | if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //加入集群后重新初始化缓存 |
| | | cache.ReInitDbTablePersonsCache() |
| | | } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //库中新增更新缓存 |
| | | cache.UpdateDbPersonsCache() |
| | | } |
| | | } else { |
| | | logger.Debug("json unmarshal error") |
| | | continue |
| | | } |
| | | |
| | | } |
| | | } |
| | | } |
| | | quit := make(chan os.Signal) |
| | | signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) |
| | | <-quit |
| | | logger.Info("Shutting down server...") |
| | | cancel() |
| | | } |