liujiandao
2024-02-20 4860c7b312cdce2d948ee417b0bb1fed60fd9dc7
main.go
@@ -6,6 +6,7 @@
   "os"
   "path"
   "path/filepath"
   "sdkCompare/proto/facecompare"
   "strconv"
   "time"
@@ -51,32 +52,44 @@
   logger.Debug("This is a new server about sdk compare, proc name ", procName)
   serveUrl := "tcp://0.0.0.0:"
   if procName == "dbCompare" {
      if err := cache.ConnectDB(); err != nil {
         logger.Error(err.Error())
         return
      }
      cache.InitDbTablePersons()
      if !cache.InitCompare() {
         logger.Debug("init SDKFace return false,panic")
         return
      }
      serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort)
   } else {
      if err := cache.Init(config.EsInfo.EsIndex.AiOcean.IndexName, targetType1); err != nil {
         logger.Info("init capture cache err:", err)
         return
      }
      if !cache.InitCompare() {
         logger.Debug("init SDKFace return false,panic")
         return
      }
      go cache.IncreVideoPersonsCache(time.Now(), targetType1)
      serveUrl = serveUrl + strconv.Itoa(config.EsCompServerInfo.ServePort)
   if err := cache.ConnectDB(); err != nil {
      logger.Error(err.Error())
      return
   }
   cache.InitDbTablePersons()
   if !cache.InitCompare() {
      logger.Debug("init SDKFace return false,panic")
      return
   }
   serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort)
   //if procName == "dbCompare" {
   //   if err := cache.ConnectDB(); err != nil {
   //      logger.Error(err.Error())
   //      return
   //   }
   //
   //   cache.InitDbTablePersons()
   //   if !cache.InitCompare() {
   //      logger.Debug("init SDKFace return false,panic")
   //      return
   //   }
   //   serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort)
   //} else {
   //   if err := cache.Init(config.EsInfo.EsIndex.AiOcean.IndexName, targetType1); err != nil {
   //      logger.Info("init capture cache err:", err)
   //      return
   //   }
   //
   //   if !cache.InitCompare() {
   //      logger.Debug("init SDKFace return false,panic")
   //      return
   //   }
   //
   //   go cache.IncreVideoPersonsCache(time.Now(), targetType1)
   //   serveUrl = serveUrl + strconv.Itoa(config.EsCompServerInfo.ServePort)
   //}
   logger.Debugf("%s serve url:%s", procName, serveUrl)
@@ -89,12 +102,12 @@
   var msg []byte
   var ctx, _ = context.WithCancel(context.Background())
   if sock, err = rep.NewSocket(); err != nil {
      logger.Debug("new rep socket err:", err)
      logger.Error("new rep socket err:", err)
   }
   sock.AddTransport(ipc.NewTransport())
   sock.AddTransport(tcp.NewTransport())
   if err = sock.Listen(url); err != nil {
      logger.Debug("listen on rep socket err:", err)
      logger.Error("listen on rep socket err:", err)
   }
   for {
@@ -109,35 +122,45 @@
         }
         if len(msg) > 0 {
            var compareArgInfo protomsg.CompareArgs
            var cacheChangeInfo protomsg.EsPersonCacheChange
            var compareEvent protomsg.CompareEvent
            if err = proto.Unmarshal(msg, &compareArgInfo); err == nil {
               timeStart := time.Now()
               result := cache.GetComparePersonBaseInfo(compareArgInfo)
               logger.Debug("用时:", time.Since(timeStart))
               err = sock.Send(result)
               if err != nil {
                  logger.Debug("send reply err:", err.Error())
               }
            } else if err = proto.Unmarshal(msg, &cacheChangeInfo); err == nil {
               cache.UpdateCache(&cacheChangeInfo)
               err = sock.Send([]byte(""))
               if err != nil {
                  logger.Debug("send reply err:", err.Error())
               }
            } else if err = proto.Unmarshal(msg, &compareEvent); err == nil {
               if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //加入集群后重新初始化缓存
                  cache.ReInitDbTablePersonsCache()
               } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //库中新增更新缓存
                  cache.UpdateDbPersonsCacheById(string(compareEvent.Payload))
               }
            } else {
               logger.Debug("json unmarshal error")
            var compareType facecompare.CompareRequest
            err = proto.Unmarshal(msg, &compareType)
            if err != nil {
               logger.Error("compareType json unmarshal error")
               continue
            }
            var result []byte
            if compareType.CompareType == facecompare.CompareType_Compare {
               var compareArgInfo protomsg.CompareArgs
               var cacheChangeInfo protomsg.EsPersonCacheChange
               if err = proto.Unmarshal(compareType.Payload, &compareArgInfo); err == nil {
                  timeStart := time.Now()
                  result = cache.GetComparePersonBaseInfo(compareArgInfo)
                  logger.Debug("用时:", time.Since(timeStart))
               } else if err = proto.Unmarshal(compareType.Payload, &cacheChangeInfo); err == nil {
                  cache.UpdateCache(&cacheChangeInfo)
               } else {
                  logger.Error("CompareArgs or EsPersonCacheChange json unmarshal error")
                  continue
               }
            } else if compareType.CompareType == facecompare.CompareType_UpdateCache {
               var compareEvent protomsg.CompareEvent
               if err = proto.Unmarshal(compareType.Payload, &compareEvent); err == nil {
                  if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //加入集群后重新初始化缓存
                     cache.ReInitDbTablePersonsCache()
                  } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //库中新增更新缓存
                     id := string(compareEvent.Payload)
                     cache.UpdateDbPersonsCacheById(id)
                     logger.Debug("--------------更新人员缓存, id: ", id)
                  }
               } else {
                  logger.Error("CompareEvent json unmarshal error")
                  continue
               }
            }
            err = sock.Send(result)
            if err != nil {
               logger.Error("send reply err:", err.Error())
            }
         }
      }
   }