liujiandao
2024-02-20 4860c7b312cdce2d948ee417b0bb1fed60fd9dc7
cache/compare.go
@@ -82,7 +82,7 @@
   // 调用增量的接口
   captures, err := libEs.GetPeriodInfos(serverIp, alarmPort, laststring, curstring, indexName, shardStr, targetType)
   if err != nil {
      logger.Debug(err)
      logger.Error(err)
      return
   }
   for _, ei := range captures {
@@ -102,7 +102,7 @@
func getShards(serverIp string, alarmIp string, alarmPort string) (string, error) {
   infos, e := libEs.GetShardsByIndex(alarmIp, alarmPort, config.EsInfo.EsIndex.AiOcean.IndexName)
   if e != nil || infos == nil {
      logger.Debug("get es primary ips err")
      logger.Error("get es primary ips err")
      return "", errors.New("get es primary ips err")
   }
@@ -148,7 +148,7 @@
   captures, err := libEs.GetOceanFeatures(serverIp, alarmPort, 1000, indexName, shardStr, targetType)
   if err != nil {
      logger.Debug("libEs.GetOceanFeatures err:", err)
      logger.Error("libEs.GetOceanFeatures err:", err)
      return err
   }
   logger.Debug("len(captures):", len(captures))
@@ -238,7 +238,7 @@
            defer wg.Done()
            dbpersons, err := dbpApi.GetPersonsCompareCacheBase(j, queryEachNum)
            if err != nil {
               logger.Debug(err)
               logger.Error(err)
               return
            }
            logger.Debugf("获取%d条人员信息", len(dbpersons))
@@ -266,56 +266,22 @@
   }
}
// UpdateDbPersonsCache 更新缓存中的全部人员信息
func UpdateDbPersonsCache() {
// UpdateDbPersonsCacheById 更新缓存中的全部人员信息
func UpdateDbPersonsCacheById(id string) {
   var dbpApi DbPersons
   total, e := dbpApi.GetPersonTotal("")
   logger.Debugf("所有底库共有%d条记录", total)
   if e == nil && total > 0 {
      queryEachNum := *querynum
      qn := int(total) / *threadnum
      if *querynum < qn {
         queryEachNum = qn
   info, err := dbpApi.GetPersonsCompareCacheById(id)
   if err != nil {
      logger.Error(err)
      return
   }
   if info.Tableid != "" {
      Cmap.Lock()
      defer Cmap.Unlock()
      if _, ok := Cmap.Cam[info.Tableid]; !ok {
         Cmap.Cam[info.Tableid] = shardmap.New(uint8(*threadnum))
      }
      queryT := int(total) / queryEachNum
      if int(total)%queryEachNum > 0 {
         queryT++
      }
      temptime := time.Now()
      var wg sync.WaitGroup
      for i := 0; i < queryT; i++ {
         j := i * queryEachNum
         wg.Add(1)
         go func(qs int) {
            defer wg.Done()
            dbpersons, err := dbpApi.GetPersonsCompareCacheBase(j, queryEachNum)
            if err != nil {
               logger.Debug(err)
               return
            }
            logger.Debugf("获取%d条人员信息", len(dbpersons))
            Cmap.Lock()
            tableId := ""
            for _, value := range dbpersons {
               tableId = value.Tableid
               if _, ok := Cmap.Cam[value.Tableid]; !ok {
                  Cmap.Cam[value.Tableid] = shardmap.New(uint8(*threadnum))
               }
               Cmap.Cam[value.Tableid].Set(value.Id, value)
            }
            if len(dbpersons) != 0 {
               Cmap.Cam[tableId].Settime()
            }
            Cmap.Unlock()
         }(j)
      }
      wg.Wait()
      logger.Debug("底库人员缓存完成用时:", time.Since(temptime))
      Cmap.Cam[info.Tableid].Set(info.Id, info)
      Cmap.Cam[info.Tableid].Settime()
   }
}
@@ -344,7 +310,7 @@
      Cmap.Cam[tableId].Del(id)
      logger.Debug("DelPerson ok success")
   } else {
      logger.Debug("tableId:", tableId, " not exist")
      logger.Error("tableId:", tableId, " not exist")
   }
}
@@ -459,7 +425,7 @@
         for _, tid := range compareArgs.TableIds { //ruleProcess比对指定底库
            shardins, ok := Cmap.Cam[tid]
            if !ok {
               logger.Debug("ruleProcess compare get shard error by tableId:", tid)
               logger.Error("ruleProcess compare get shard error by tableId:", tid)
               continue
            }
            if tShard, hasT := Cmap.Cam[PRE_DBTABLE]; hasT {
@@ -472,10 +438,10 @@
                     }
                  }
               } else {
                  logger.Debug("ruleProcess compare tables,tShard not exist tableId:", tid)
                  logger.Error("ruleProcess compare tables,tShard not exist tableId:", tid)
               }
            } else {
               logger.Debug("ruleProcess compare tables,PRE_DBTABLE tableId:", tid, " not exist")
               logger.Error("ruleProcess compare tables,PRE_DBTABLE tableId:", tid, " not exist")
            }
         }
      } else { //web请求,比对指定的抓拍库或者底库
@@ -484,7 +450,7 @@
               tStart := time.Now()
               serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter)
               if e != nil {
                  logger.Debug("util.GetLocalIP err:", e)
                  logger.Error("util.GetLocalIP err:", e)
                  continue
               }
               alarmPort := config.EsCompServerInfo.ESPort
@@ -553,7 +519,7 @@
            } else {
               shardins, ok := Cmap.Cam[tid]
               if !ok {
                  logger.Debug("get shard error by tableId:", tid)
                  logger.Error("get shard error by tableId:", tid)
                  continue
               }
@@ -570,7 +536,7 @@
   buf, err := proto.Marshal(&scResult)
   if err != nil {
      logger.Debug("scResult Marshal error!", err)
      logger.Error("scResult Marshal error!", err)
      return nil
   }
@@ -581,16 +547,16 @@
   co_d, err := base64.StdEncoding.DecodeString(co)
   if err != nil {
      logger.Debug("DoSdkCompare err:", err)
      logger.Error("DoSdkCompare err:", err)
      return -1
   }
   if len(co_d) != 2560 {
      logger.Debug("target fea.len !=2560")
      logger.Error("target fea.len !=2560")
      return -1
   }
   if len(ci) != 2560 {
      logger.Debug("source fea.len !=2560")
      logger.Error("source fea.len !=2560")
      return -1
   }
   sec := DecCompare(ci, co_d)