package cache import ( "encoding/base64" "errors" "flag" "fmt" "strconv" "strings" "sync" "time" "sdkCompare/cache/shardmap" "sdkCompare/config" "sdkCompare/util" libEs "basic.com/pubsub/esutil.git" "basic.com/pubsub/protomsg.git" "basic.com/valib/logger.git" "github.com/golang/protobuf/proto" ) var querynum = flag.Int("querynum", 3000, "the query number from database") var threadnum = flag.Int("threadnum", 32, "the number of thread to deal data.") var ( thresholdLimit = float32(50) captureTable = "capturetable" cartable = "carTable_" PRE_DBTABLE = "dbTable_" PRE_CAPTURE_SERVER = "captureServer_" ) type CmapItem struct { sync.Mutex Cam map[string]*shardmap.ShardMap } var Cmap *CmapItem var doOnce sync.Once // 计算每次查询的数据量条数 func Querynum(totalnum int) int { qn := totalnum / *threadnum //qn=6551 if *querynum < qn { // return *querynum } return qn } // 增量查询 func IncreVideoPersonsCache(lastT time.Time, targetType string) { ticker := time.NewTicker(time.Minute * 3) for { select { case <-ticker.C: curTime := time.Now() Incrementquery(lastT, curTime, targetType) lastT = curTime } } } func Incrementquery(last time.Time, cur time.Time, targetType string) { laststring := last.Format("2006-01-02 15:04:05") curstring := cur.Format("2006-01-02 15:04:05") alarmIp := config.EsCompServerInfo.ESIP alarmPort := config.EsCompServerInfo.ESPort indexName := config.EsInfo.EsIndex.AiOcean.IndexName serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter) if e != nil { return } shardStr, e := getShards(serverIp, alarmIp, alarmPort) if e != nil { return } // 调用增量的接口 captures, err := libEs.GetPeriodInfos(serverIp, alarmPort, laststring, curstring, indexName, shardStr, targetType) if err != nil { logger.Error(err) return } for _, ei := range captures { if ei.EsInfo.AnalyServerId != "" { cKey := PRE_CAPTURE_SERVER + ei.EsInfo.AnalyServerId ei.EsInfo.Tableid = captureTable if _, ok := Cmap.Cam[cKey]; !ok { Cmap.Cam[cKey] = shardmap.New(uint8(*threadnum)) } Cmap.Cam[cKey].Set(ei.EsInfo.Id, ei.EsInfo) } } } func getShards(serverIp string, alarmIp string, alarmPort string) (string, error) { infos, e := libEs.GetShardsByIndex(alarmIp, alarmPort, config.EsInfo.EsIndex.AiOcean.IndexName) if e != nil || infos == nil { logger.Error("get es primary ips err") return "", errors.New("get es primary ips err") } var shards []string for _, shard := range infos { if (shard.ShardIp == serverIp || shard.ShardIp == "127.0.0.1") && shard.ShardRole == "primary" && shard.ShardState == "STARTED" { shards = append(shards, strconv.Itoa(shard.ShardNum)) } } if len(shards) == 0 { return "", errors.New("current shards is empty") } return strings.Join(shards, ","), nil } // 1. 拿到总量, 计算每个线程的查询量 // 2. 分线程查询 func Init(indexName string, targetType string) error { alarmIp := config.EsCompServerInfo.ESIP alarmPort := config.EsCompServerInfo.ESPort doOnce.Do(func() { flag.Parse() Cmap = &CmapItem{ Cam: make(map[string]*shardmap.ShardMap), } }) serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter) if e != nil { return e } shardStr, e := getShards(serverIp, alarmIp, alarmPort) if e != nil { return e } // 这里需要提供总量的接口 estotalnum := libEs.GetTotal(serverIp, alarmPort, indexName, shardStr, targetType) *querynum = Querynum(estotalnum) temptime := time.Now() captures, err := libEs.GetOceanFeatures(serverIp, alarmPort, 1000, indexName, shardStr, targetType) if err != nil { logger.Error("libEs.GetOceanFeatures err:", err) return err } logger.Debug("len(captures):", len(captures)) Cmap.Lock() for _, ei := range captures { if ei.EsInfo.AnalyServerId != "" { cKey := PRE_CAPTURE_SERVER + ei.EsInfo.AnalyServerId ei.EsInfo.Tableid = captureTable if _, ok := Cmap.Cam[cKey]; !ok { Cmap.Cam[cKey] = shardmap.New(uint8(*threadnum)) } Cmap.Cam[cKey].Set(ei.EsInfo.Id, ei.EsInfo) } } Cmap.Unlock() logger.Debug(indexName, "缓存完成用时:", time.Since(temptime)) return nil } func ReInitDbTablePersonsCache() { Cmap.Lock() defer Cmap.Unlock() if Cmap == nil { Cmap = &CmapItem{ Cam: make(map[string]*shardmap.ShardMap), } } for tableId, _ := range Cmap.Cam { delete(Cmap.Cam, tableId) } initDbTablePersonsCache() } func InitDbTablePersons() { doOnce.Do(func() { flag.Parse() Cmap = &CmapItem{ Cam: make(map[string]*shardmap.ShardMap), } }) initDbTablePersonsCache() } func initDbTablePersonsCache() { // 查询所有的底库列表 var dtApi DbTables allTables, err := dtApi.FindAllDbTablesByCurServer() if err == nil && allTables != nil && len(allTables) > 0 { //初始化底库缓存信息 Cmap.Lock() for _, table := range allTables { if _, ok := Cmap.Cam[PRE_DBTABLE]; !ok { Cmap.Cam[PRE_DBTABLE] = shardmap.New(uint8(*threadnum)) } Cmap.Cam[PRE_DBTABLE].Set(table.Id, table.Enable == 1) logger.Debugf("初始化底库[%s][%s]到缓存中", table.Id, table.TableName) } Cmap.Unlock() } // 缓存底库中的全部人员信息 var dbpApi DbPersons total, e := dbpApi.GetPersonTotal("") logger.Debugf("所有底库共有%d条记录", total) if e == nil && total > 0 { queryEachNum := *querynum qn := int(total) / *threadnum if *querynum < qn { queryEachNum = qn } queryT := int(total) / queryEachNum if int(total)%queryEachNum > 0 { queryT++ } temptime := time.Now() var wg sync.WaitGroup for i := 0; i < queryT; i++ { j := i * queryEachNum wg.Add(1) go func(qs int) { defer wg.Done() dbpersons, err := dbpApi.GetPersonsCompareCacheBase(j, queryEachNum) if err != nil { logger.Error(err) return } logger.Debugf("获取%d条人员信息", len(dbpersons)) Cmap.Lock() tableId := "" for _, value := range dbpersons { tableId = value.Tableid if _, ok := Cmap.Cam[value.Tableid]; !ok { Cmap.Cam[value.Tableid] = shardmap.New(uint8(*threadnum)) } Cmap.Cam[value.Tableid].Set(value.Id, value) } if len(dbpersons) != 0 { Cmap.Cam[tableId].Settime() } Cmap.Unlock() }(j) } wg.Wait() logger.Debug("底库人员缓存完成用时:", time.Since(temptime)) } } // UpdateDbPersonsCacheById 更新缓存中的全部人员信息 func UpdateDbPersonsCacheById(id string) { var dbpApi DbPersons info, err := dbpApi.GetPersonsCompareCacheById(id) if err != nil { logger.Error(err) return } if info.Tableid != "" { Cmap.Lock() defer Cmap.Unlock() if _, ok := Cmap.Cam[info.Tableid]; !ok { Cmap.Cam[info.Tableid] = shardmap.New(uint8(*threadnum)) } Cmap.Cam[info.Tableid].Set(info.Id, info) Cmap.Cam[info.Tableid].Settime() } } func RealTimeAddPersonInfoToCache(tableId string, id string, faceFeature string, enable int32, carNo string) { Cmap.Lock() defer Cmap.Unlock() if _, ok := Cmap.Cam[tableId]; !ok { Cmap.Cam[tableId] = shardmap.New(uint8(*threadnum)) } var ei = protomsg.Esinfo{ Id: id, Tableid: tableId, FaceFeature: faceFeature, Enable: enable, CarNo: carNo, } Cmap.Cam[tableId].Set(id, &ei) logger.Debug("id:", id, ",tableId:", ",len(faceFeature):", len(faceFeature), ",tableId:", tableId, ",enable:", enable) } func RealTimeDelPersonFromCache(tableId string, id string) { logger.Debug("DelPersonFromCache,tableId:", tableId, ",id:", id) Cmap.Lock() defer Cmap.Unlock() if _, ok := Cmap.Cam[tableId]; ok { Cmap.Cam[tableId].Del(id) logger.Debug("DelPerson ok success") } else { logger.Error("tableId:", tableId, " not exist") } } func RealTimeDelTable(tableId string) { logger.Debug("RealTimeDelTable tableId:", tableId) Cmap.Lock() defer Cmap.Unlock() if dtM, ok := Cmap.Cam[PRE_DBTABLE]; ok { dtM.Del(tableId) } if _, ok := Cmap.Cam[tableId]; ok { delete(Cmap.Cam, tableId) } } // 使底库生效,将底库中的所有生效状态的人特征添加到缓存 func RealTimeUpdateTable(tableId string, enable int32) { logger.Debug("RealTimeUpdateTable tableId:", tableId, ",enable:", enable) Cmap.Lock() defer Cmap.Unlock() if _, ok := Cmap.Cam[PRE_DBTABLE]; !ok { Cmap.Cam[PRE_DBTABLE] = shardmap.New(uint8(*threadnum)) } Cmap.Cam[PRE_DBTABLE].Set(tableId, enable == 1) } func UpdateCache(changeMsg *protomsg.EsPersonCacheChange) { if changeMsg.Type == protomsg.EsCacheChanged_T_DbTable { if changeMsg.Action == protomsg.DbAction_Insert || changeMsg.Action == protomsg.DbAction_Update { RealTimeUpdateTable(changeMsg.TableId[0], changeMsg.Enable) } else if changeMsg.Action == protomsg.DbAction_Delete { RealTimeDelTable(changeMsg.TableId[0]) } } else if changeMsg.Type == protomsg.EsCacheChanged_T_DbTablePerson { if changeMsg.Action == protomsg.DbAction_Insert || changeMsg.Action == protomsg.DbAction_Update { RealTimeAddPersonInfoToCache(changeMsg.TableId[0], changeMsg.PersonId, changeMsg.Feature, changeMsg.Enable, changeMsg.CarNo) } else if changeMsg.Action == protomsg.DbAction_Delete { RealTimeDelPersonFromCache(changeMsg.TableId[0], changeMsg.PersonId) } } } func GetComparePersonBaseInfo(compareArgs protomsg.CompareArgs) []byte { if compareArgs.FaceFeature == nil { return nil } //指定最低分 baseScore := thresholdLimit if compareArgs.CompareThreshold > thresholdLimit { baseScore = compareArgs.CompareThreshold } if compareArgs.IsCompareAll { baseScore = 0 } var scResult protomsg.SdkCompareResult //未指定比对目标map if compareArgs.TableIds == nil || len(compareArgs.TableIds) == 0 { logger.Debugf("接收到底库比对请求, 阈值:%f", compareArgs.CompareThreshold) // 比对来源是ruleprocess,传空比全部底库 if !compareArgs.Source { for key, val := range Cmap.Cam { // 判断是否属于底库 if tShard, hasT := Cmap.Cam[PRE_DBTABLE]; hasT { if tEnable, tOk := tShard.Get(key); tOk { //存在此底库 //底库有效 if tEnable.(bool) { targets := val.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget) if len(targets) > 0 { scResult.CompareResult = append(scResult.CompareResult, targets...) } } } } } } else { //来源是web,比对所有抓拍和底库 for cKey, val := range Cmap.Cam { if compareArgs.AnalyServerId != "" { //比对指定server产生的抓拍数据和底库 if cKey == PRE_CAPTURE_SERVER+compareArgs.AnalyServerId || !strings.HasPrefix(cKey, PRE_CAPTURE_SERVER) { targets := val.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget) if len(targets) > 0 { scResult.CompareResult = append(scResult.CompareResult, targets...) } } } else { //管理平台请求,比对所有数据 if len(compareArgs.ServerIds) > 0 { for _, termDevId := range compareArgs.ServerIds { if cKey == PRE_CAPTURE_SERVER+termDevId || !strings.HasPrefix(cKey, PRE_CAPTURE_SERVER) { targets := val.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget) if len(targets) > 0 { scResult.CompareResult = append(scResult.CompareResult, targets...) } } } } else { targets := val.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget) if len(targets) > 0 { scResult.CompareResult = append(scResult.CompareResult, targets...) } } } } } } else { //指定比对目标map if !compareArgs.Source { for _, tid := range compareArgs.TableIds { //ruleProcess比对指定底库 shardins, ok := Cmap.Cam[tid] if !ok { logger.Error("ruleProcess compare get shard error by tableId:", tid) continue } if tShard, hasT := Cmap.Cam[PRE_DBTABLE]; hasT { if tEnable, tOk := tShard.Get(tid); tOk { //存在此底库 logger.Debug("ruleProcess compare tables,exist tableId:", tid, ",enable:", tEnable) if tEnable.(bool) { //底库有效 targets := shardins.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget) if len(targets) > 0 { scResult.CompareResult = append(scResult.CompareResult, targets...) } } } else { logger.Error("ruleProcess compare tables,tShard not exist tableId:", tid) } } else { logger.Error("ruleProcess compare tables,PRE_DBTABLE tableId:", tid, " not exist") } } } else { //web请求,比对指定的抓拍库或者底库 for _, tid := range compareArgs.TableIds { if tid == captureTable { //比对抓拍库 tStart := time.Now() serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter) if e != nil { logger.Error("util.GetLocalIP err:", e) continue } alarmPort := config.EsCompServerInfo.ESPort scopeIds := libEs.GetAllLocalVideopersonsId(compareArgs, config.EsInfo.EsIndex.AiOcean.IndexName, serverIp, alarmPort, compareArgs.AlarmLevel) logger.Debug("libEs.GetAllLocalVideopersonsId len(scopeIds):", len(scopeIds), " 耗时:", time.Since(tStart)) if scopeIds != nil { tCompStart := time.Now() if compareArgs.AnalyServerId != "" { //指定server for cKey, cacheMap := range Cmap.Cam { if cKey == PRE_CAPTURE_SERVER+compareArgs.AnalyServerId { for _, sId := range scopeIds { obj, ok := cacheMap.Get(sId) if ok { eInfo, ok := obj.(*protomsg.Esinfo) if !ok { continue } sec := DoSdkCompare(compareArgs.FaceFeature, eInfo.FaceFeature) if sec >= baseScore { scResult.CompareResult = append(scResult.CompareResult, &protomsg.SdkCompareEach{ Id: eInfo.Id, CompareScore: sec, Tableid: eInfo.Tableid, }) } } } } } } else { //管理平台请求比对所有抓拍,不指定server for cKey, cacheMap := range Cmap.Cam { if len(compareArgs.ServerIds) > 0 { for _, termDevId := range compareArgs.ServerIds { if cKey == PRE_CAPTURE_SERVER+termDevId || !strings.HasPrefix(cKey, PRE_CAPTURE_SERVER) { targets := cacheMap.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget) if len(targets) > 0 { scResult.CompareResult = append(scResult.CompareResult, targets...) } } } } else { if strings.HasPrefix(cKey, PRE_CAPTURE_SERVER) { for _, sId := range scopeIds { if obj, ok := cacheMap.Get(sId); ok { eInfo, ok := obj.(*protomsg.Esinfo) if !ok { continue } sec := DoSdkCompare(compareArgs.FaceFeature, eInfo.FaceFeature) if sec >= baseScore { scResult.CompareResult = append(scResult.CompareResult, &protomsg.SdkCompareEach{ Id: eInfo.Id, CompareScore: sec, Tableid: eInfo.Tableid, }) } } } } } } } logger.Debug("根据scopeIds比对耗时:", time.Since(tCompStart)) } } else { shardins, ok := Cmap.Cam[tid] if !ok { logger.Error("get shard error by tableId:", tid) continue } targets := shardins.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget) if len(targets) > 0 { scResult.CompareResult = append(scResult.CompareResult, targets...) } } } } } logger.Debug("比对结果: len(scResult.CompareResult):", len(scResult.CompareResult)) buf, err := proto.Marshal(&scResult) if err != nil { logger.Error("scResult Marshal error!", err) return nil } return buf } func DoSdkCompare(ci []byte, co string) float32 { co_d, err := base64.StdEncoding.DecodeString(co) if err != nil { logger.Error("DoSdkCompare err:", err) return -1 } //if len(co_d) != 2560 { // logger.Error("target fea.len !=2560") // return -1 //} // //if len(ci) != 2560 { // logger.Error("source fea.len !=2560") // return -1 //} sec := DecCompare(ci, co_d) //logger.Debug("比对得分为:", sec) sec = ParseScore(sec) return sec } func ParseScore(compareScore float32) float32 { if compareScore <= 1 { compareScore = compareScore * 100 } if compareScore == 100 { return 100 } f, _ := strconv.ParseFloat(fmt.Sprintf("%2.2f", compareScore), 32) return float32(f) }