package main import ( "context" "flag" "os" "path" "path/filepath" "strconv" "time" "sdkCompare/cache" "sdkCompare/config" "basic.com/pubsub/protomsg.git" "basic.com/valib/logger.git" "basic.com/valib/version.git" "github.com/golang/protobuf/proto" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/rep" "nanomsg.org/go-mangos/transport/ipc" "nanomsg.org/go-mangos/transport/tcp" ) var ( envirment string procName string targetType1 string ) func init() { flag.StringVar(&envirment, "e", "config", "") flag.StringVar(&targetType1, "targetType1", "FaceDetect", "cache feature") flag.Parse() vaversion.Usage() config.Init(envirment) procName = filepath.Base(os.Args[0]) var logFile = path.Join(config.LogConf.Path, procName+".log") // 日志初始化 logger.InitLogger(logFile, config.LogConf.Level, config.LogConf.MaxSize, config.LogConf.MaxBackups, config.LogConf.MaxAge) logger.Info("logger init success !") } func main() { //esutil.InitLog(logger.Debug) logger.Debug("This is a new server about sdk compare, proc name ", procName) serveUrl := "tcp://0.0.0.0:" if procName == "dbCompare" { if err := cache.ConnectDB(); err != nil { logger.Error(err.Error()) return } cache.InitDbTablePersons() if !cache.InitCompare() { logger.Debug("init SDKFace return false,panic") return } serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort) } else { if err := cache.Init(config.EsInfo.EsIndex.AiOcean.IndexName, targetType1); err != nil { logger.Info("init capture cache err:", err) return } if !cache.InitCompare() { logger.Debug("init SDKFace return false,panic") return } go cache.IncreVideoPersonsCache(time.Now(), targetType1) serveUrl = serveUrl + strconv.Itoa(config.EsCompServerInfo.ServePort) } logger.Debugf("%s serve url:%s", procName, serveUrl) Recv(serveUrl) } func Recv(url string) { var sock mangos.Socket var err error var msg []byte var ctx, _ = context.WithCancel(context.Background()) if sock, err = rep.NewSocket(); err != nil { logger.Debug("new rep socket err:", err) } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) if err = sock.Listen(url); err != nil { logger.Debug("listen on rep socket err:", err) } for { select { case <-ctx.Done(): logger.Debug("ctx done") return default: msg, err = sock.Recv() if err != nil { continue } if len(msg) > 0 { var compareArgInfo protomsg.CompareArgs var cacheChangeInfo protomsg.EsPersonCacheChange var compareEvent protomsg.CompareEvent if err = proto.Unmarshal(msg, &compareArgInfo); err == nil { timeStart := time.Now() result := cache.GetComparePersonBaseInfo(compareArgInfo) logger.Debug("用时:", time.Since(timeStart)) err = sock.Send(result) if err != nil { logger.Debug("send reply err:", err.Error()) } } else if err = proto.Unmarshal(msg, &cacheChangeInfo); err == nil { cache.UpdateCache(&cacheChangeInfo) err = sock.Send([]byte("")) if err != nil { logger.Debug("send reply err:", err.Error()) } } else if err = proto.Unmarshal(msg, &compareEvent); err == nil { if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //加入集群后重新初始化缓存 cache.ReInitDbTablePersonsCache() } } else { logger.Debug("json unmarshal error") continue } } } } }