package cache
|
|
import (
|
"encoding/base64"
|
"errors"
|
"flag"
|
"fmt"
|
"strconv"
|
"strings"
|
"sync"
|
"time"
|
|
"sdkCompare/cache/shardmap"
|
"sdkCompare/config"
|
"sdkCompare/util"
|
|
libEs "basic.com/pubsub/esutil.git"
|
"basic.com/pubsub/protomsg.git"
|
"basic.com/valib/logger.git"
|
"github.com/golang/protobuf/proto"
|
)
|
|
var querynum = flag.Int("querynum", 3000, "the query number from database")
|
var threadnum = flag.Int("threadnum", 32, "the number of thread to deal data.")
|
|
var (
|
thresholdLimit = float32(50)
|
captureTable = "capturetable"
|
cartable = "carTable_"
|
PRE_DBTABLE = "dbTable_"
|
PRE_CAPTURE_SERVER = "captureServer_"
|
)
|
|
type CmapItem struct {
|
sync.Mutex
|
Cam map[string]*shardmap.ShardMap
|
}
|
|
var Cmap *CmapItem
|
var doOnce sync.Once
|
|
// 计算每次查询的数据量条数
|
func Querynum(totalnum int) int {
|
qn := totalnum / *threadnum //qn=6551
|
if *querynum < qn { //
|
|
return *querynum
|
}
|
return qn
|
}
|
|
// 增量查询
|
func IncreVideoPersonsCache(lastT time.Time, targetType string) {
|
ticker := time.NewTicker(time.Minute * 3)
|
for {
|
select {
|
case <-ticker.C:
|
curTime := time.Now()
|
Incrementquery(lastT, curTime, targetType)
|
lastT = curTime
|
}
|
}
|
}
|
|
func Incrementquery(last time.Time, cur time.Time, targetType string) {
|
laststring := last.Format("2006-01-02 15:04:05")
|
curstring := cur.Format("2006-01-02 15:04:05")
|
alarmIp := config.EsCompServerInfo.ESIP
|
alarmPort := config.EsCompServerInfo.ESPort
|
indexName := config.EsInfo.EsIndex.AiOcean.IndexName
|
|
serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter)
|
if e != nil {
|
return
|
}
|
|
shardStr, e := getShards(serverIp, alarmIp, alarmPort)
|
if e != nil {
|
return
|
}
|
|
// 调用增量的接口
|
captures, err := libEs.GetPeriodInfos(serverIp, alarmPort, laststring, curstring, indexName, shardStr, targetType)
|
if err != nil {
|
logger.Error(err)
|
return
|
}
|
for _, ei := range captures {
|
if ei.EsInfo.AnalyServerId != "" {
|
cKey := PRE_CAPTURE_SERVER + ei.EsInfo.AnalyServerId
|
ei.EsInfo.Tableid = captureTable
|
|
if _, ok := Cmap.Cam[cKey]; !ok {
|
Cmap.Cam[cKey] = shardmap.New(uint8(*threadnum))
|
}
|
|
Cmap.Cam[cKey].Set(ei.EsInfo.Id, ei.EsInfo)
|
}
|
}
|
}
|
|
func getShards(serverIp string, alarmIp string, alarmPort string) (string, error) {
|
infos, e := libEs.GetShardsByIndex(alarmIp, alarmPort, config.EsInfo.EsIndex.AiOcean.IndexName)
|
if e != nil || infos == nil {
|
logger.Error("get es primary ips err")
|
return "", errors.New("get es primary ips err")
|
}
|
|
var shards []string
|
for _, shard := range infos {
|
if (shard.ShardIp == serverIp || shard.ShardIp == "127.0.0.1") && shard.ShardRole == "primary" && shard.ShardState == "STARTED" {
|
shards = append(shards, strconv.Itoa(shard.ShardNum))
|
}
|
}
|
if len(shards) == 0 {
|
return "", errors.New("current shards is empty")
|
}
|
return strings.Join(shards, ","), nil
|
}
|
|
// 1. 拿到总量, 计算每个线程的查询量
|
// 2. 分线程查询
|
func Init(indexName string, targetType string) error {
|
alarmIp := config.EsCompServerInfo.ESIP
|
alarmPort := config.EsCompServerInfo.ESPort
|
|
doOnce.Do(func() {
|
flag.Parse()
|
|
Cmap = &CmapItem{
|
Cam: make(map[string]*shardmap.ShardMap),
|
}
|
})
|
|
serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter)
|
if e != nil {
|
return e
|
}
|
|
shardStr, e := getShards(serverIp, alarmIp, alarmPort)
|
if e != nil {
|
return e
|
}
|
// 这里需要提供总量的接口
|
estotalnum := libEs.GetTotal(serverIp, alarmPort, indexName, shardStr, targetType)
|
*querynum = Querynum(estotalnum)
|
temptime := time.Now()
|
|
captures, err := libEs.GetOceanFeatures(serverIp, alarmPort, 1000, indexName, shardStr, targetType)
|
if err != nil {
|
logger.Error("libEs.GetOceanFeatures err:", err)
|
return err
|
}
|
logger.Debug("len(captures):", len(captures))
|
Cmap.Lock()
|
for _, ei := range captures {
|
if ei.EsInfo.AnalyServerId != "" {
|
cKey := PRE_CAPTURE_SERVER + ei.EsInfo.AnalyServerId
|
ei.EsInfo.Tableid = captureTable
|
if _, ok := Cmap.Cam[cKey]; !ok {
|
Cmap.Cam[cKey] = shardmap.New(uint8(*threadnum))
|
}
|
|
Cmap.Cam[cKey].Set(ei.EsInfo.Id, ei.EsInfo)
|
}
|
}
|
|
Cmap.Unlock()
|
|
logger.Debug(indexName, "缓存完成用时:", time.Since(temptime))
|
return nil
|
}
|
|
func ReInitDbTablePersonsCache() {
|
Cmap.Lock()
|
defer Cmap.Unlock()
|
if Cmap == nil {
|
Cmap = &CmapItem{
|
Cam: make(map[string]*shardmap.ShardMap),
|
}
|
}
|
for tableId, _ := range Cmap.Cam {
|
delete(Cmap.Cam, tableId)
|
}
|
|
initDbTablePersonsCache()
|
}
|
|
func InitDbTablePersons() {
|
doOnce.Do(func() {
|
flag.Parse()
|
|
Cmap = &CmapItem{
|
Cam: make(map[string]*shardmap.ShardMap),
|
}
|
})
|
|
initDbTablePersonsCache()
|
}
|
|
func initDbTablePersonsCache() {
|
// 查询所有的底库列表
|
var dtApi DbTables
|
allTables, err := dtApi.FindAllDbTablesByCurServer()
|
if err == nil && allTables != nil && len(allTables) > 0 { //初始化底库缓存信息
|
Cmap.Lock()
|
for _, table := range allTables {
|
if _, ok := Cmap.Cam[PRE_DBTABLE]; !ok {
|
Cmap.Cam[PRE_DBTABLE] = shardmap.New(uint8(*threadnum))
|
}
|
Cmap.Cam[PRE_DBTABLE].Set(table.Id, table.Enable == 1)
|
logger.Debugf("初始化底库[%s][%s]到缓存中", table.Id, table.TableName)
|
}
|
Cmap.Unlock()
|
}
|
|
// 缓存底库中的全部人员信息
|
var dbpApi DbPersons
|
total, e := dbpApi.GetPersonTotal("")
|
logger.Debugf("所有底库共有%d条记录", total)
|
if e == nil && total > 0 {
|
queryEachNum := *querynum
|
qn := int(total) / *threadnum
|
if *querynum < qn {
|
queryEachNum = qn
|
}
|
queryT := int(total) / queryEachNum
|
if int(total)%queryEachNum > 0 {
|
queryT++
|
}
|
temptime := time.Now()
|
var wg sync.WaitGroup
|
|
for i := 0; i < queryT; i++ {
|
j := i * queryEachNum
|
wg.Add(1)
|
go func(qs int) {
|
defer wg.Done()
|
dbpersons, err := dbpApi.GetPersonsCompareCacheBase(j, queryEachNum)
|
if err != nil {
|
logger.Error(err)
|
return
|
}
|
logger.Debugf("获取%d条人员信息", len(dbpersons))
|
Cmap.Lock()
|
tableId := ""
|
for _, value := range dbpersons {
|
tableId = value.Tableid
|
if _, ok := Cmap.Cam[value.Tableid]; !ok {
|
Cmap.Cam[value.Tableid] = shardmap.New(uint8(*threadnum))
|
}
|
|
Cmap.Cam[value.Tableid].Set(value.Id, value)
|
}
|
|
if len(dbpersons) != 0 {
|
Cmap.Cam[tableId].Settime()
|
}
|
|
Cmap.Unlock()
|
|
}(j)
|
}
|
wg.Wait()
|
logger.Debug("底库人员缓存完成用时:", time.Since(temptime))
|
}
|
}
|
|
// UpdateDbPersonsCacheById 更新缓存中的全部人员信息
|
func UpdateDbPersonsCacheById(id string) {
|
var dbpApi DbPersons
|
info, err := dbpApi.GetPersonsCompareCacheById(id)
|
if err != nil {
|
logger.Error(err)
|
return
|
}
|
if info.Tableid != "" {
|
Cmap.Lock()
|
defer Cmap.Unlock()
|
if _, ok := Cmap.Cam[info.Tableid]; !ok {
|
Cmap.Cam[info.Tableid] = shardmap.New(uint8(*threadnum))
|
}
|
Cmap.Cam[info.Tableid].Set(info.Id, info)
|
Cmap.Cam[info.Tableid].Settime()
|
}
|
}
|
|
func RealTimeAddPersonInfoToCache(tableId string, id string, faceFeature string, enable int32, carNo string) {
|
Cmap.Lock()
|
defer Cmap.Unlock()
|
if _, ok := Cmap.Cam[tableId]; !ok {
|
Cmap.Cam[tableId] = shardmap.New(uint8(*threadnum))
|
}
|
var ei = protomsg.Esinfo{
|
Id: id,
|
Tableid: tableId,
|
FaceFeature: faceFeature,
|
Enable: enable,
|
CarNo: carNo,
|
}
|
Cmap.Cam[tableId].Set(id, &ei)
|
logger.Debug("id:", id, ",tableId:", ",len(faceFeature):", len(faceFeature), ",tableId:", tableId, ",enable:", enable)
|
}
|
|
func RealTimeDelPersonFromCache(tableId string, id string) {
|
logger.Debug("DelPersonFromCache,tableId:", tableId, ",id:", id)
|
Cmap.Lock()
|
defer Cmap.Unlock()
|
if _, ok := Cmap.Cam[tableId]; ok {
|
Cmap.Cam[tableId].Del(id)
|
logger.Debug("DelPerson ok success")
|
} else {
|
logger.Error("tableId:", tableId, " not exist")
|
}
|
}
|
|
func RealTimeDelTable(tableId string) {
|
logger.Debug("RealTimeDelTable tableId:", tableId)
|
Cmap.Lock()
|
defer Cmap.Unlock()
|
|
if dtM, ok := Cmap.Cam[PRE_DBTABLE]; ok {
|
dtM.Del(tableId)
|
}
|
if _, ok := Cmap.Cam[tableId]; ok {
|
delete(Cmap.Cam, tableId)
|
}
|
}
|
|
// 使底库生效,将底库中的所有生效状态的人特征添加到缓存
|
func RealTimeUpdateTable(tableId string, enable int32) {
|
logger.Debug("RealTimeUpdateTable tableId:", tableId, ",enable:", enable)
|
Cmap.Lock()
|
defer Cmap.Unlock()
|
|
if _, ok := Cmap.Cam[PRE_DBTABLE]; !ok {
|
Cmap.Cam[PRE_DBTABLE] = shardmap.New(uint8(*threadnum))
|
}
|
Cmap.Cam[PRE_DBTABLE].Set(tableId, enable == 1)
|
}
|
|
func UpdateCache(changeMsg *protomsg.EsPersonCacheChange) {
|
if changeMsg.Type == protomsg.EsCacheChanged_T_DbTable {
|
if changeMsg.Action == protomsg.DbAction_Insert || changeMsg.Action == protomsg.DbAction_Update {
|
RealTimeUpdateTable(changeMsg.TableId[0], changeMsg.Enable)
|
} else if changeMsg.Action == protomsg.DbAction_Delete {
|
RealTimeDelTable(changeMsg.TableId[0])
|
}
|
} else if changeMsg.Type == protomsg.EsCacheChanged_T_DbTablePerson {
|
if changeMsg.Action == protomsg.DbAction_Insert || changeMsg.Action == protomsg.DbAction_Update {
|
RealTimeAddPersonInfoToCache(changeMsg.TableId[0], changeMsg.PersonId, changeMsg.Feature, changeMsg.Enable, changeMsg.CarNo)
|
} else if changeMsg.Action == protomsg.DbAction_Delete {
|
RealTimeDelPersonFromCache(changeMsg.TableId[0], changeMsg.PersonId)
|
}
|
}
|
}
|
|
func GetComparePersonBaseInfo(compareArgs protomsg.CompareArgs) []byte {
|
if compareArgs.FaceFeature == nil {
|
return nil
|
}
|
|
//指定最低分
|
baseScore := thresholdLimit
|
if compareArgs.CompareThreshold > thresholdLimit {
|
baseScore = compareArgs.CompareThreshold
|
}
|
|
if compareArgs.IsCompareAll {
|
baseScore = 0
|
}
|
|
var scResult protomsg.SdkCompareResult
|
|
//未指定比对目标map
|
if compareArgs.TableIds == nil || len(compareArgs.TableIds) == 0 {
|
logger.Debugf("接收到底库比对请求, 阈值:%f", compareArgs.CompareThreshold)
|
|
// 比对来源是ruleprocess,传空比全部底库
|
if !compareArgs.Source {
|
for key, val := range Cmap.Cam {
|
// 判断是否属于底库
|
if tShard, hasT := Cmap.Cam[PRE_DBTABLE]; hasT {
|
if tEnable, tOk := tShard.Get(key); tOk { //存在此底库
|
//底库有效
|
if tEnable.(bool) {
|
targets := val.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget)
|
if len(targets) > 0 {
|
scResult.CompareResult = append(scResult.CompareResult, targets...)
|
}
|
}
|
}
|
}
|
}
|
} else { //来源是web,比对所有抓拍和底库
|
for cKey, val := range Cmap.Cam {
|
if compareArgs.AnalyServerId != "" { //比对指定server产生的抓拍数据和底库
|
if cKey == PRE_CAPTURE_SERVER+compareArgs.AnalyServerId || !strings.HasPrefix(cKey, PRE_CAPTURE_SERVER) {
|
targets := val.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget)
|
if len(targets) > 0 {
|
scResult.CompareResult = append(scResult.CompareResult, targets...)
|
}
|
}
|
} else { //管理平台请求,比对所有数据
|
if len(compareArgs.ServerIds) > 0 {
|
for _, termDevId := range compareArgs.ServerIds {
|
if cKey == PRE_CAPTURE_SERVER+termDevId || !strings.HasPrefix(cKey, PRE_CAPTURE_SERVER) {
|
targets := val.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget)
|
if len(targets) > 0 {
|
scResult.CompareResult = append(scResult.CompareResult, targets...)
|
}
|
}
|
}
|
} else {
|
targets := val.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget)
|
if len(targets) > 0 {
|
scResult.CompareResult = append(scResult.CompareResult, targets...)
|
}
|
}
|
}
|
}
|
}
|
} else { //指定比对目标map
|
if !compareArgs.Source {
|
for _, tid := range compareArgs.TableIds { //ruleProcess比对指定底库
|
shardins, ok := Cmap.Cam[tid]
|
if !ok {
|
logger.Error("ruleProcess compare get shard error by tableId:", tid)
|
continue
|
}
|
if tShard, hasT := Cmap.Cam[PRE_DBTABLE]; hasT {
|
if tEnable, tOk := tShard.Get(tid); tOk { //存在此底库
|
logger.Debug("ruleProcess compare tables,exist tableId:", tid, ",enable:", tEnable)
|
if tEnable.(bool) { //底库有效
|
targets := shardins.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget)
|
if len(targets) > 0 {
|
scResult.CompareResult = append(scResult.CompareResult, targets...)
|
}
|
}
|
} else {
|
logger.Error("ruleProcess compare tables,tShard not exist tableId:", tid)
|
}
|
} else {
|
logger.Error("ruleProcess compare tables,PRE_DBTABLE tableId:", tid, " not exist")
|
}
|
}
|
} else { //web请求,比对指定的抓拍库或者底库
|
for _, tid := range compareArgs.TableIds {
|
if tid == captureTable { //比对抓拍库
|
tStart := time.Now()
|
serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter)
|
if e != nil {
|
logger.Error("util.GetLocalIP err:", e)
|
continue
|
}
|
alarmPort := config.EsCompServerInfo.ESPort
|
|
scopeIds := libEs.GetAllLocalVideopersonsId(compareArgs, config.EsInfo.EsIndex.AiOcean.IndexName, serverIp, alarmPort, compareArgs.AlarmLevel)
|
logger.Debug("libEs.GetAllLocalVideopersonsId len(scopeIds):", len(scopeIds), " 耗时:", time.Since(tStart))
|
if scopeIds != nil {
|
tCompStart := time.Now()
|
if compareArgs.AnalyServerId != "" { //指定server
|
for cKey, cacheMap := range Cmap.Cam {
|
if cKey == PRE_CAPTURE_SERVER+compareArgs.AnalyServerId {
|
for _, sId := range scopeIds {
|
obj, ok := cacheMap.Get(sId)
|
if ok {
|
eInfo, ok := obj.(*protomsg.Esinfo)
|
if !ok {
|
continue
|
}
|
sec := DoSdkCompare(compareArgs.FaceFeature, eInfo.FaceFeature)
|
if sec >= baseScore {
|
scResult.CompareResult = append(scResult.CompareResult, &protomsg.SdkCompareEach{
|
Id: eInfo.Id,
|
CompareScore: sec,
|
Tableid: eInfo.Tableid,
|
})
|
}
|
}
|
}
|
}
|
}
|
} else { //管理平台请求比对所有抓拍,不指定server
|
for cKey, cacheMap := range Cmap.Cam {
|
if len(compareArgs.ServerIds) > 0 {
|
for _, termDevId := range compareArgs.ServerIds {
|
if cKey == PRE_CAPTURE_SERVER+termDevId || !strings.HasPrefix(cKey, PRE_CAPTURE_SERVER) {
|
targets := cacheMap.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget)
|
if len(targets) > 0 {
|
scResult.CompareResult = append(scResult.CompareResult, targets...)
|
}
|
}
|
}
|
} else {
|
if strings.HasPrefix(cKey, PRE_CAPTURE_SERVER) {
|
for _, sId := range scopeIds {
|
if obj, ok := cacheMap.Get(sId); ok {
|
eInfo, ok := obj.(*protomsg.Esinfo)
|
if !ok {
|
continue
|
}
|
sec := DoSdkCompare(compareArgs.FaceFeature, eInfo.FaceFeature)
|
if sec >= baseScore {
|
scResult.CompareResult = append(scResult.CompareResult, &protomsg.SdkCompareEach{
|
Id: eInfo.Id,
|
CompareScore: sec,
|
Tableid: eInfo.Tableid,
|
})
|
}
|
}
|
}
|
}
|
}
|
}
|
}
|
logger.Debug("根据scopeIds比对耗时:", time.Since(tCompStart))
|
}
|
} else {
|
shardins, ok := Cmap.Cam[tid]
|
if !ok {
|
logger.Error("get shard error by tableId:", tid)
|
continue
|
}
|
|
targets := shardins.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget)
|
if len(targets) > 0 {
|
scResult.CompareResult = append(scResult.CompareResult, targets...)
|
}
|
}
|
}
|
}
|
}
|
|
logger.Debug("比对结果: len(scResult.CompareResult):", len(scResult.CompareResult))
|
buf, err := proto.Marshal(&scResult)
|
|
if err != nil {
|
logger.Error("scResult Marshal error!", err)
|
return nil
|
}
|
|
return buf
|
}
|
|
func DoSdkCompare(ci []byte, co string) float32 {
|
|
co_d, err := base64.StdEncoding.DecodeString(co)
|
if err != nil {
|
logger.Error("DoSdkCompare err:", err)
|
return -1
|
}
|
if len(co_d) != 2560 {
|
logger.Error("target fea.len !=2560")
|
return -1
|
}
|
|
if len(ci) != 2560 {
|
logger.Error("source fea.len !=2560")
|
return -1
|
}
|
sec := DecCompare(ci, co_d)
|
//logger.Debug("比对得分为:", sec)
|
|
sec = ParseScore(sec)
|
return sec
|
}
|
|
func ParseScore(compareScore float32) float32 {
|
if compareScore <= 1 {
|
compareScore = compareScore * 100
|
}
|
if compareScore == 100 {
|
return 100
|
}
|
f, _ := strconv.ParseFloat(fmt.Sprintf("%2.2f", compareScore), 32)
|
|
return float32(f)
|
}
|