package main import ( "context" "flag" "path" "strconv" "time" "sdkCompare/cache" "sdkCompare/compare" "sdkCompare/config" "sdkCompare/db" "sdkCompare/proto/facecompare" "basic.com/pubsub/protomsg.git" "basic.com/valib/logger.git" "basic.com/valib/version.git" "github.com/golang/protobuf/proto" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/rep" "nanomsg.org/go-mangos/transport/ipc" "nanomsg.org/go-mangos/transport/tcp" ) const procName = "faceCompare" func init() { flag.Parse() vaversion.Usage() } func main() { err := config.Init() if err != nil { return } var logFile = path.Join(config.LogConf.Path, "faceCompare.log") // 日志初始化 logger.InitLogger(logFile, config.LogConf.Level, config.LogConf.MaxSize, config.LogConf.MaxBackups, config.LogConf.MaxAge) logger.Info("logger init success !") serveUrl := "tcp://0.0.0.0:" if err := db.ConnectDB(); err != nil { logger.Error(err.Error()) return } cache.InitDbTablePersons() serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort) logger.Infof("%s serve url:%s", procName, serveUrl) Recv(serveUrl) } func Recv(url string) { var sock mangos.Socket var err error var msg []byte var ctx, _ = context.WithCancel(context.Background()) if sock, err = rep.NewSocket(); err != nil { logger.Error("new rep socket err:", err) } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) if err = sock.Listen(url); err != nil { logger.Error("listen on rep socket err:", err) } for { select { case <-ctx.Done(): logger.Info("ctx done") return default: msg, err = sock.Recv() if err != nil || len(msg) <= 0 { continue } var request facecompare.CompareRequest err = proto.Unmarshal(msg, &request) if err != nil { logger.Warn("CompareRequest json unmarshal error") continue } var result []byte if request.CompareType == facecompare.CompareType_Compare { var compareArgInfo protomsg.CompareArgs if err = proto.Unmarshal(request.Payload, &compareArgInfo); err == nil { timeStart := time.Now() result = compare.GetComparePersonBaseInfo(compareArgInfo) logger.Debug("用时:", time.Since(timeStart)) } else { logger.Warn("CompareArgs or EsPersonCacheChange json unmarshal error") continue } } else if request.CompareType == facecompare.CompareType_UpdateCache { var compareEvent protomsg.CompareEvent if err = proto.Unmarshal(request.Payload, &compareEvent); err == nil { if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //加入集群后重新初始化缓存 cache.ReInitDbTablePersonsCache() } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //库中新增更新缓存 id := string(compareEvent.Payload) cache.UpdateDbPersonsCacheById(id) logger.Info("--------------更新人员缓存, id: ", id) } else if compareEvent.EventType == protomsg.CompareEventType_DeleteCache { //库中新增更新缓存 id := string(compareEvent.Payload) cache.DeleteDbPersonsCacheById(id) logger.Info("--------------删除人员缓存, id: ", id) } } else { logger.Warn("CompareEvent json unmarshal error") continue } } err = sock.Send(result) if err != nil { logger.Warn("send reply err:", err.Error()) } } } }