zhangzengfei
2024-03-29 3cdf9117668a91f2fefcda89eb4263d7fb2f4434
cache/compare.go
@@ -2,7 +2,6 @@
import (
   "encoding/base64"
   "errors"
   "flag"
   "fmt"
   "sdkCompare/face"
@@ -11,14 +10,10 @@
   "sync"
   "time"
   "sdkCompare/cache/shardmap"
   "sdkCompare/config"
   "sdkCompare/util"
   libEs "basic.com/pubsub/esutil.git"
   "basic.com/pubsub/protomsg.git"
   "basic.com/valib/logger.git"
   "github.com/golang/protobuf/proto"
   "sdkCompare/cache/shardmap"
)
var querynum = flag.Int("querynum", 3000, "the query number from database")
@@ -27,7 +22,6 @@
var (
   thresholdLimit     = float32(50)
   captureTable       = "capturetable"
   cartable           = "carTable_"
   PRE_DBTABLE        = "dbTable_"
   PRE_CAPTURE_SERVER = "captureServer_"
)
@@ -39,138 +33,6 @@
var Cmap *CmapItem
var doOnce sync.Once
// 计算每次查询的数据量条数
func Querynum(totalnum int) int {
   qn := totalnum / *threadnum //qn=6551
   if *querynum < qn {         //
      return *querynum
   }
   return qn
}
// 增量查询
func IncreVideoPersonsCache(lastT time.Time, targetType string) {
   ticker := time.NewTicker(time.Minute * 3)
   for {
      select {
      case <-ticker.C:
         curTime := time.Now()
         Incrementquery(lastT, curTime, targetType)
         lastT = curTime
      }
   }
}
func Incrementquery(last time.Time, cur time.Time, targetType string) {
   laststring := last.Format("2006-01-02 15:04:05")
   curstring := cur.Format("2006-01-02 15:04:05")
   alarmIp := config.EsCompServerInfo.ESIP
   alarmPort := config.EsCompServerInfo.ESPort
   indexName := config.EsInfo.EsIndex.AiOcean.IndexName
   serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter)
   if e != nil {
      return
   }
   shardStr, e := getShards(serverIp, alarmIp, alarmPort)
   if e != nil {
      return
   }
   // 调用增量的接口
   captures, err := libEs.GetPeriodInfos(serverIp, alarmPort, laststring, curstring, indexName, shardStr, targetType)
   if err != nil {
      logger.Error(err)
      return
   }
   for _, ei := range captures {
      if ei.EsInfo.AnalyServerId != "" {
         cKey := PRE_CAPTURE_SERVER + ei.EsInfo.AnalyServerId
         ei.EsInfo.Tableid = captureTable
         if _, ok := Cmap.Cam[cKey]; !ok {
            Cmap.Cam[cKey] = shardmap.New(uint8(*threadnum))
         }
         Cmap.Cam[cKey].Set(ei.EsInfo.Id, ei.EsInfo)
      }
   }
}
func getShards(serverIp string, alarmIp string, alarmPort string) (string, error) {
   infos, e := libEs.GetShardsByIndex(alarmIp, alarmPort, config.EsInfo.EsIndex.AiOcean.IndexName)
   if e != nil || infos == nil {
      logger.Error("get es primary ips err")
      return "", errors.New("get es primary ips err")
   }
   var shards []string
   for _, shard := range infos {
      if (shard.ShardIp == serverIp || shard.ShardIp == "127.0.0.1") && shard.ShardRole == "primary" && shard.ShardState == "STARTED" {
         shards = append(shards, strconv.Itoa(shard.ShardNum))
      }
   }
   if len(shards) == 0 {
      return "", errors.New("current shards is empty")
   }
   return strings.Join(shards, ","), nil
}
// 1. 拿到总量, 计算每个线程的查询量
// 2. 分线程查询
func Init(indexName string, targetType string) error {
   alarmIp := config.EsCompServerInfo.ESIP
   alarmPort := config.EsCompServerInfo.ESPort
   doOnce.Do(func() {
      flag.Parse()
      Cmap = &CmapItem{
         Cam: make(map[string]*shardmap.ShardMap),
      }
   })
   serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter)
   if e != nil {
      return e
   }
   shardStr, e := getShards(serverIp, alarmIp, alarmPort)
   if e != nil {
      return e
   }
   // 这里需要提供总量的接口
   estotalnum := libEs.GetTotal(serverIp, alarmPort, indexName, shardStr, targetType)
   *querynum = Querynum(estotalnum)
   temptime := time.Now()
   captures, err := libEs.GetOceanFeatures(serverIp, alarmPort, 1000, indexName, shardStr, targetType)
   if err != nil {
      logger.Error("libEs.GetOceanFeatures err:", err)
      return err
   }
   logger.Debug("len(captures):", len(captures))
   Cmap.Lock()
   for _, ei := range captures {
      if ei.EsInfo.AnalyServerId != "" {
         cKey := PRE_CAPTURE_SERVER + ei.EsInfo.AnalyServerId
         ei.EsInfo.Tableid = captureTable
         if _, ok := Cmap.Cam[cKey]; !ok {
            Cmap.Cam[cKey] = shardmap.New(uint8(*threadnum))
         }
         Cmap.Cam[cKey].Set(ei.EsInfo.Id, ei.EsInfo)
      }
   }
   Cmap.Unlock()
   logger.Debug(indexName, "缓存完成用时:", time.Since(temptime))
   return nil
}
func ReInitDbTablePersonsCache() {
   Cmap.Lock()
@@ -447,87 +309,15 @@
         }
      } else { //web请求,比对指定的抓拍库或者底库
         for _, tid := range compareArgs.TableIds {
            if tid == captureTable { //比对抓拍库
               tStart := time.Now()
               serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter)
               if e != nil {
                  logger.Error("util.GetLocalIP err:", e)
                  continue
               }
               alarmPort := config.EsCompServerInfo.ESPort
            shardins, ok := Cmap.Cam[tid]
            if !ok {
               logger.Error("get shard error by tableId:", tid)
               continue
            }
               scopeIds := libEs.GetAllLocalVideopersonsId(compareArgs, config.EsInfo.EsIndex.AiOcean.IndexName, serverIp, alarmPort, compareArgs.AlarmLevel)
               logger.Debug("libEs.GetAllLocalVideopersonsId len(scopeIds):", len(scopeIds), " 耗时:", time.Since(tStart))
               if scopeIds != nil {
                  tCompStart := time.Now()
                  if compareArgs.AnalyServerId != "" { //指定server
                     for cKey, cacheMap := range Cmap.Cam {
                        if cKey == PRE_CAPTURE_SERVER+compareArgs.AnalyServerId {
                           for _, sId := range scopeIds {
                              obj, ok := cacheMap.Get(sId)
                              if ok {
                                 eInfo, ok := obj.(*protomsg.Esinfo)
                                 if !ok {
                                    continue
                                 }
                                 sec := DoSdkCompare(compareArgs.FaceFeature, eInfo.FaceFeature)
                                 if sec >= baseScore {
                                    scResult.CompareResult = append(scResult.CompareResult, &protomsg.SdkCompareEach{
                                       Id:           eInfo.Id,
                                       CompareScore: sec,
                                       Tableid:      eInfo.Tableid,
                                    })
                                 }
                              }
                           }
                        }
                     }
                  } else { //管理平台请求比对所有抓拍,不指定server
                     for cKey, cacheMap := range Cmap.Cam {
                        if len(compareArgs.ServerIds) > 0 {
                           for _, termDevId := range compareArgs.ServerIds {
                              if cKey == PRE_CAPTURE_SERVER+termDevId || !strings.HasPrefix(cKey, PRE_CAPTURE_SERVER) {
                                 targets := cacheMap.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget)
                                 if len(targets) > 0 {
                                    scResult.CompareResult = append(scResult.CompareResult, targets...)
                                 }
                              }
                           }
                        } else {
                           if strings.HasPrefix(cKey, PRE_CAPTURE_SERVER) {
                              for _, sId := range scopeIds {
                                 if obj, ok := cacheMap.Get(sId); ok {
                                    eInfo, ok := obj.(*protomsg.Esinfo)
                                    if !ok {
                                       continue
                                    }
                                    sec := DoSdkCompare(compareArgs.FaceFeature, eInfo.FaceFeature)
                                    if sec >= baseScore {
                                       scResult.CompareResult = append(scResult.CompareResult, &protomsg.SdkCompareEach{
                                          Id:           eInfo.Id,
                                          CompareScore: sec,
                                          Tableid:      eInfo.Tableid,
                                       })
                                    }
                                 }
                              }
                           }
                        }
                     }
                  }
                  logger.Debug("根据scopeIds比对耗时:", time.Since(tCompStart))
               }
            } else {
               shardins, ok := Cmap.Cam[tid]
               if !ok {
                  logger.Error("get shard error by tableId:", tid)
                  continue
               }
               targets := shardins.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget)
               if len(targets) > 0 {
                  scResult.CompareResult = append(scResult.CompareResult, targets...)
               }
            targets := shardins.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget)
            if len(targets) > 0 {
               scResult.CompareResult = append(scResult.CompareResult, targets...)
            }
         }
      }
@@ -550,15 +340,6 @@
      logger.Error("DoSdkCompare err:", err)
      return -1
   }
   //if len(co_d) != 2560 {
   //   logger.Error("target fea.len !=2560")
   //   return -1
   //}
   //
   //if len(ci) != 2560 {
   //   logger.Error("source fea.len !=2560")
   //   return -1
   //}
   sec := face.DecCompare(ci, co_d)
   //logger.Debug("比对得分为:", sec)