| | |
| | | |
| | | import ( |
| | | "encoding/base64" |
| | | "errors" |
| | | "flag" |
| | | "fmt" |
| | | "sdkCompare/face" |
| | |
| | | "sync" |
| | | "time" |
| | | |
| | | "sdkCompare/cache/shardmap" |
| | | "sdkCompare/config" |
| | | "sdkCompare/util" |
| | | |
| | | libEs "basic.com/pubsub/esutil.git" |
| | | "basic.com/pubsub/protomsg.git" |
| | | "basic.com/valib/logger.git" |
| | | "github.com/golang/protobuf/proto" |
| | | "sdkCompare/cache/shardmap" |
| | | ) |
| | | |
| | | var querynum = flag.Int("querynum", 3000, "the query number from database") |
| | |
| | | var ( |
| | | thresholdLimit = float32(50) |
| | | captureTable = "capturetable" |
| | | cartable = "carTable_" |
| | | PRE_DBTABLE = "dbTable_" |
| | | PRE_CAPTURE_SERVER = "captureServer_" |
| | | ) |
| | |
| | | |
| | | var Cmap *CmapItem |
| | | var doOnce sync.Once |
| | | |
| | | // 计算每次查询的数据量条数 |
| | | func Querynum(totalnum int) int { |
| | | qn := totalnum / *threadnum //qn=6551 |
| | | if *querynum < qn { // |
| | | |
| | | return *querynum |
| | | } |
| | | return qn |
| | | } |
| | | |
| | | // 增量查询 |
| | | func IncreVideoPersonsCache(lastT time.Time, targetType string) { |
| | | ticker := time.NewTicker(time.Minute * 3) |
| | | for { |
| | | select { |
| | | case <-ticker.C: |
| | | curTime := time.Now() |
| | | Incrementquery(lastT, curTime, targetType) |
| | | lastT = curTime |
| | | } |
| | | } |
| | | } |
| | | |
| | | func Incrementquery(last time.Time, cur time.Time, targetType string) { |
| | | laststring := last.Format("2006-01-02 15:04:05") |
| | | curstring := cur.Format("2006-01-02 15:04:05") |
| | | alarmIp := config.EsCompServerInfo.ESIP |
| | | alarmPort := config.EsCompServerInfo.ESPort |
| | | indexName := config.EsInfo.EsIndex.AiOcean.IndexName |
| | | |
| | | serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter) |
| | | if e != nil { |
| | | return |
| | | } |
| | | |
| | | shardStr, e := getShards(serverIp, alarmIp, alarmPort) |
| | | if e != nil { |
| | | return |
| | | } |
| | | |
| | | // 调用增量的接口 |
| | | captures, err := libEs.GetPeriodInfos(serverIp, alarmPort, laststring, curstring, indexName, shardStr, targetType) |
| | | if err != nil { |
| | | logger.Error(err) |
| | | return |
| | | } |
| | | for _, ei := range captures { |
| | | if ei.EsInfo.AnalyServerId != "" { |
| | | cKey := PRE_CAPTURE_SERVER + ei.EsInfo.AnalyServerId |
| | | ei.EsInfo.Tableid = captureTable |
| | | |
| | | if _, ok := Cmap.Cam[cKey]; !ok { |
| | | Cmap.Cam[cKey] = shardmap.New(uint8(*threadnum)) |
| | | } |
| | | |
| | | Cmap.Cam[cKey].Set(ei.EsInfo.Id, ei.EsInfo) |
| | | } |
| | | } |
| | | } |
| | | |
| | | func getShards(serverIp string, alarmIp string, alarmPort string) (string, error) { |
| | | infos, e := libEs.GetShardsByIndex(alarmIp, alarmPort, config.EsInfo.EsIndex.AiOcean.IndexName) |
| | | if e != nil || infos == nil { |
| | | logger.Error("get es primary ips err") |
| | | return "", errors.New("get es primary ips err") |
| | | } |
| | | |
| | | var shards []string |
| | | for _, shard := range infos { |
| | | if (shard.ShardIp == serverIp || shard.ShardIp == "127.0.0.1") && shard.ShardRole == "primary" && shard.ShardState == "STARTED" { |
| | | shards = append(shards, strconv.Itoa(shard.ShardNum)) |
| | | } |
| | | } |
| | | if len(shards) == 0 { |
| | | return "", errors.New("current shards is empty") |
| | | } |
| | | return strings.Join(shards, ","), nil |
| | | } |
| | | |
| | | // 1. 拿到总量, 计算每个线程的查询量 |
| | | // 2. 分线程查询 |
| | | func Init(indexName string, targetType string) error { |
| | | alarmIp := config.EsCompServerInfo.ESIP |
| | | alarmPort := config.EsCompServerInfo.ESPort |
| | | |
| | | doOnce.Do(func() { |
| | | flag.Parse() |
| | | |
| | | Cmap = &CmapItem{ |
| | | Cam: make(map[string]*shardmap.ShardMap), |
| | | } |
| | | }) |
| | | |
| | | serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter) |
| | | if e != nil { |
| | | return e |
| | | } |
| | | |
| | | shardStr, e := getShards(serverIp, alarmIp, alarmPort) |
| | | if e != nil { |
| | | return e |
| | | } |
| | | // 这里需要提供总量的接口 |
| | | estotalnum := libEs.GetTotal(serverIp, alarmPort, indexName, shardStr, targetType) |
| | | *querynum = Querynum(estotalnum) |
| | | temptime := time.Now() |
| | | |
| | | captures, err := libEs.GetOceanFeatures(serverIp, alarmPort, 1000, indexName, shardStr, targetType) |
| | | if err != nil { |
| | | logger.Error("libEs.GetOceanFeatures err:", err) |
| | | return err |
| | | } |
| | | logger.Debug("len(captures):", len(captures)) |
| | | Cmap.Lock() |
| | | for _, ei := range captures { |
| | | if ei.EsInfo.AnalyServerId != "" { |
| | | cKey := PRE_CAPTURE_SERVER + ei.EsInfo.AnalyServerId |
| | | ei.EsInfo.Tableid = captureTable |
| | | if _, ok := Cmap.Cam[cKey]; !ok { |
| | | Cmap.Cam[cKey] = shardmap.New(uint8(*threadnum)) |
| | | } |
| | | |
| | | Cmap.Cam[cKey].Set(ei.EsInfo.Id, ei.EsInfo) |
| | | } |
| | | } |
| | | |
| | | Cmap.Unlock() |
| | | |
| | | logger.Debug(indexName, "缓存完成用时:", time.Since(temptime)) |
| | | return nil |
| | | } |
| | | |
| | | func ReInitDbTablePersonsCache() { |
| | | Cmap.Lock() |
| | |
| | | } |
| | | } else { //web请求,比对指定的抓拍库或者底库 |
| | | for _, tid := range compareArgs.TableIds { |
| | | if tid == captureTable { //比对抓拍库 |
| | | tStart := time.Now() |
| | | serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter) |
| | | if e != nil { |
| | | logger.Error("util.GetLocalIP err:", e) |
| | | continue |
| | | } |
| | | alarmPort := config.EsCompServerInfo.ESPort |
| | | shardins, ok := Cmap.Cam[tid] |
| | | if !ok { |
| | | logger.Error("get shard error by tableId:", tid) |
| | | continue |
| | | } |
| | | |
| | | scopeIds := libEs.GetAllLocalVideopersonsId(compareArgs, config.EsInfo.EsIndex.AiOcean.IndexName, serverIp, alarmPort, compareArgs.AlarmLevel) |
| | | logger.Debug("libEs.GetAllLocalVideopersonsId len(scopeIds):", len(scopeIds), " 耗时:", time.Since(tStart)) |
| | | if scopeIds != nil { |
| | | tCompStart := time.Now() |
| | | if compareArgs.AnalyServerId != "" { //指定server |
| | | for cKey, cacheMap := range Cmap.Cam { |
| | | if cKey == PRE_CAPTURE_SERVER+compareArgs.AnalyServerId { |
| | | for _, sId := range scopeIds { |
| | | obj, ok := cacheMap.Get(sId) |
| | | if ok { |
| | | eInfo, ok := obj.(*protomsg.Esinfo) |
| | | if !ok { |
| | | continue |
| | | } |
| | | sec := DoSdkCompare(compareArgs.FaceFeature, eInfo.FaceFeature) |
| | | if sec >= baseScore { |
| | | scResult.CompareResult = append(scResult.CompareResult, &protomsg.SdkCompareEach{ |
| | | Id: eInfo.Id, |
| | | CompareScore: sec, |
| | | Tableid: eInfo.Tableid, |
| | | }) |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } else { //管理平台请求比对所有抓拍,不指定server |
| | | for cKey, cacheMap := range Cmap.Cam { |
| | | if len(compareArgs.ServerIds) > 0 { |
| | | for _, termDevId := range compareArgs.ServerIds { |
| | | if cKey == PRE_CAPTURE_SERVER+termDevId || !strings.HasPrefix(cKey, PRE_CAPTURE_SERVER) { |
| | | targets := cacheMap.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget) |
| | | if len(targets) > 0 { |
| | | scResult.CompareResult = append(scResult.CompareResult, targets...) |
| | | } |
| | | } |
| | | } |
| | | } else { |
| | | if strings.HasPrefix(cKey, PRE_CAPTURE_SERVER) { |
| | | for _, sId := range scopeIds { |
| | | if obj, ok := cacheMap.Get(sId); ok { |
| | | eInfo, ok := obj.(*protomsg.Esinfo) |
| | | if !ok { |
| | | continue |
| | | } |
| | | sec := DoSdkCompare(compareArgs.FaceFeature, eInfo.FaceFeature) |
| | | if sec >= baseScore { |
| | | scResult.CompareResult = append(scResult.CompareResult, &protomsg.SdkCompareEach{ |
| | | Id: eInfo.Id, |
| | | CompareScore: sec, |
| | | Tableid: eInfo.Tableid, |
| | | }) |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | logger.Debug("根据scopeIds比对耗时:", time.Since(tCompStart)) |
| | | } |
| | | } else { |
| | | shardins, ok := Cmap.Cam[tid] |
| | | if !ok { |
| | | logger.Error("get shard error by tableId:", tid) |
| | | continue |
| | | } |
| | | |
| | | targets := shardins.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget) |
| | | if len(targets) > 0 { |
| | | scResult.CompareResult = append(scResult.CompareResult, targets...) |
| | | } |
| | | targets := shardins.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget) |
| | | if len(targets) > 0 { |
| | | scResult.CompareResult = append(scResult.CompareResult, targets...) |
| | | } |
| | | } |
| | | } |
| | |
| | | logger.Error("DoSdkCompare err:", err) |
| | | return -1 |
| | | } |
| | | //if len(co_d) != 2560 { |
| | | // logger.Error("target fea.len !=2560") |
| | | // return -1 |
| | | //} |
| | | // |
| | | //if len(ci) != 2560 { |
| | | // logger.Error("source fea.len !=2560") |
| | | // return -1 |
| | | //} |
| | | sec := face.DecCompare(ci, co_d) |
| | | //logger.Debug("比对得分为:", sec) |
| | | |