package cache
|
|
import (
|
"encoding/base64"
|
"flag"
|
"fmt"
|
"sdkCompare/face"
|
"strconv"
|
"strings"
|
"sync"
|
"time"
|
|
"basic.com/pubsub/protomsg.git"
|
"basic.com/valib/logger.git"
|
"github.com/golang/protobuf/proto"
|
"sdkCompare/cache/shardmap"
|
)
|
|
var querynum = flag.Int("querynum", 3000, "the query number from database")
|
var threadnum = flag.Int("threadnum", 32, "the number of thread to deal data.")
|
|
var (
|
thresholdLimit = float32(50)
|
captureTable = "capturetable"
|
PRE_DBTABLE = "dbTable_"
|
PRE_CAPTURE_SERVER = "captureServer_"
|
)
|
|
type CmapItem struct {
|
sync.Mutex
|
Cam map[string]*shardmap.ShardMap
|
}
|
|
var Cmap *CmapItem
|
var doOnce sync.Once
|
|
func ReInitDbTablePersonsCache() {
|
Cmap.Lock()
|
defer Cmap.Unlock()
|
if Cmap == nil {
|
Cmap = &CmapItem{
|
Cam: make(map[string]*shardmap.ShardMap),
|
}
|
}
|
for tableId, _ := range Cmap.Cam {
|
delete(Cmap.Cam, tableId)
|
}
|
|
initDbTablePersonsCache()
|
}
|
|
func InitDbTablePersons() {
|
doOnce.Do(func() {
|
flag.Parse()
|
|
Cmap = &CmapItem{
|
Cam: make(map[string]*shardmap.ShardMap),
|
}
|
})
|
|
initDbTablePersonsCache()
|
}
|
|
func initDbTablePersonsCache() {
|
// 查询所有的底库列表
|
var dtApi DbTables
|
allTables, err := dtApi.FindAllDbTablesByCurServer()
|
if err == nil && allTables != nil && len(allTables) > 0 { //初始化底库缓存信息
|
Cmap.Lock()
|
for _, table := range allTables {
|
if _, ok := Cmap.Cam[PRE_DBTABLE]; !ok {
|
Cmap.Cam[PRE_DBTABLE] = shardmap.New(uint8(*threadnum))
|
}
|
Cmap.Cam[PRE_DBTABLE].Set(table.Id, table.Enable == 1)
|
logger.Debugf("初始化底库[%s][%s]到缓存中", table.Id, table.TableName)
|
}
|
Cmap.Unlock()
|
}
|
|
// 缓存底库中的全部人员信息
|
var dbpApi DbPersons
|
total, e := dbpApi.GetPersonTotal("")
|
logger.Debugf("所有底库共有%d条记录", total)
|
if e == nil && total > 0 {
|
queryEachNum := *querynum
|
qn := int(total) / *threadnum
|
if *querynum < qn {
|
queryEachNum = qn
|
}
|
queryT := int(total) / queryEachNum
|
if int(total)%queryEachNum > 0 {
|
queryT++
|
}
|
temptime := time.Now()
|
var wg sync.WaitGroup
|
|
for i := 0; i < queryT; i++ {
|
j := i * queryEachNum
|
wg.Add(1)
|
go func(qs int) {
|
defer wg.Done()
|
dbpersons, err := dbpApi.GetPersonsCompareCacheBase(j, queryEachNum)
|
if err != nil {
|
logger.Error(err)
|
return
|
}
|
logger.Debugf("获取%d条人员信息", len(dbpersons))
|
Cmap.Lock()
|
tableId := ""
|
for _, value := range dbpersons {
|
tableId = value.Tableid
|
if _, ok := Cmap.Cam[value.Tableid]; !ok {
|
Cmap.Cam[value.Tableid] = shardmap.New(uint8(*threadnum))
|
}
|
|
Cmap.Cam[value.Tableid].Set(value.Id, value)
|
}
|
|
if len(dbpersons) != 0 {
|
Cmap.Cam[tableId].Settime()
|
}
|
|
Cmap.Unlock()
|
|
}(j)
|
}
|
wg.Wait()
|
logger.Debug("底库人员缓存完成用时:", time.Since(temptime))
|
}
|
}
|
|
// UpdateDbPersonsCacheById 更新缓存中的全部人员信息
|
func UpdateDbPersonsCacheById(id string) {
|
var dbpApi DbPersons
|
info, err := dbpApi.GetPersonsCompareCacheById(id)
|
if err != nil {
|
logger.Error(err)
|
return
|
}
|
if info.Tableid != "" {
|
Cmap.Lock()
|
defer Cmap.Unlock()
|
if _, ok := Cmap.Cam[info.Tableid]; !ok {
|
Cmap.Cam[info.Tableid] = shardmap.New(uint8(*threadnum))
|
}
|
Cmap.Cam[info.Tableid].Set(info.Id, info)
|
Cmap.Cam[info.Tableid].Settime()
|
}
|
}
|
|
func RealTimeAddPersonInfoToCache(tableId string, id string, faceFeature string, enable int32, carNo string) {
|
Cmap.Lock()
|
defer Cmap.Unlock()
|
if _, ok := Cmap.Cam[tableId]; !ok {
|
Cmap.Cam[tableId] = shardmap.New(uint8(*threadnum))
|
}
|
var ei = protomsg.Esinfo{
|
Id: id,
|
Tableid: tableId,
|
FaceFeature: faceFeature,
|
Enable: enable,
|
CarNo: carNo,
|
}
|
Cmap.Cam[tableId].Set(id, &ei)
|
logger.Debug("id:", id, ",tableId:", ",len(faceFeature):", len(faceFeature), ",tableId:", tableId, ",enable:", enable)
|
}
|
|
func RealTimeDelPersonFromCache(tableId string, id string) {
|
logger.Debug("DelPersonFromCache,tableId:", tableId, ",id:", id)
|
Cmap.Lock()
|
defer Cmap.Unlock()
|
if _, ok := Cmap.Cam[tableId]; ok {
|
Cmap.Cam[tableId].Del(id)
|
logger.Debug("DelPerson ok success")
|
} else {
|
logger.Error("tableId:", tableId, " not exist")
|
}
|
}
|
|
func RealTimeDelTable(tableId string) {
|
logger.Debug("RealTimeDelTable tableId:", tableId)
|
Cmap.Lock()
|
defer Cmap.Unlock()
|
|
if dtM, ok := Cmap.Cam[PRE_DBTABLE]; ok {
|
dtM.Del(tableId)
|
}
|
if _, ok := Cmap.Cam[tableId]; ok {
|
delete(Cmap.Cam, tableId)
|
}
|
}
|
|
// 使底库生效,将底库中的所有生效状态的人特征添加到缓存
|
func RealTimeUpdateTable(tableId string, enable int32) {
|
logger.Debug("RealTimeUpdateTable tableId:", tableId, ",enable:", enable)
|
Cmap.Lock()
|
defer Cmap.Unlock()
|
|
if _, ok := Cmap.Cam[PRE_DBTABLE]; !ok {
|
Cmap.Cam[PRE_DBTABLE] = shardmap.New(uint8(*threadnum))
|
}
|
Cmap.Cam[PRE_DBTABLE].Set(tableId, enable == 1)
|
}
|
|
func UpdateCache(changeMsg *protomsg.EsPersonCacheChange) {
|
if changeMsg.Type == protomsg.EsCacheChanged_T_DbTable {
|
if changeMsg.Action == protomsg.DbAction_Insert || changeMsg.Action == protomsg.DbAction_Update {
|
RealTimeUpdateTable(changeMsg.TableId[0], changeMsg.Enable)
|
} else if changeMsg.Action == protomsg.DbAction_Delete {
|
RealTimeDelTable(changeMsg.TableId[0])
|
}
|
} else if changeMsg.Type == protomsg.EsCacheChanged_T_DbTablePerson {
|
if changeMsg.Action == protomsg.DbAction_Insert || changeMsg.Action == protomsg.DbAction_Update {
|
RealTimeAddPersonInfoToCache(changeMsg.TableId[0], changeMsg.PersonId, changeMsg.Feature, changeMsg.Enable, changeMsg.CarNo)
|
} else if changeMsg.Action == protomsg.DbAction_Delete {
|
RealTimeDelPersonFromCache(changeMsg.TableId[0], changeMsg.PersonId)
|
}
|
}
|
}
|
|
func GetComparePersonBaseInfo(compareArgs protomsg.CompareArgs) []byte {
|
if compareArgs.FaceFeature == nil {
|
return nil
|
}
|
|
//指定最低分
|
baseScore := thresholdLimit
|
if compareArgs.CompareThreshold > thresholdLimit {
|
baseScore = compareArgs.CompareThreshold
|
}
|
|
if compareArgs.IsCompareAll {
|
baseScore = 0
|
}
|
|
var scResult protomsg.SdkCompareResult
|
|
//未指定比对目标map
|
if compareArgs.TableIds == nil || len(compareArgs.TableIds) == 0 {
|
logger.Debugf("接收到底库比对请求, 阈值:%f", compareArgs.CompareThreshold)
|
|
// 比对来源是ruleprocess,传空比全部底库
|
if !compareArgs.Source {
|
for key, val := range Cmap.Cam {
|
// 判断是否属于底库
|
if tShard, hasT := Cmap.Cam[PRE_DBTABLE]; hasT {
|
if tEnable, tOk := tShard.Get(key); tOk { //存在此底库
|
//底库有效
|
if tEnable.(bool) {
|
targets := val.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget)
|
if len(targets) > 0 {
|
scResult.CompareResult = append(scResult.CompareResult, targets...)
|
}
|
}
|
}
|
}
|
}
|
} else { //来源是web,比对所有抓拍和底库
|
for cKey, val := range Cmap.Cam {
|
if compareArgs.AnalyServerId != "" { //比对指定server产生的抓拍数据和底库
|
if cKey == PRE_CAPTURE_SERVER+compareArgs.AnalyServerId || !strings.HasPrefix(cKey, PRE_CAPTURE_SERVER) {
|
targets := val.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget)
|
if len(targets) > 0 {
|
scResult.CompareResult = append(scResult.CompareResult, targets...)
|
}
|
}
|
} else { //管理平台请求,比对所有数据
|
if len(compareArgs.ServerIds) > 0 {
|
for _, termDevId := range compareArgs.ServerIds {
|
if cKey == PRE_CAPTURE_SERVER+termDevId || !strings.HasPrefix(cKey, PRE_CAPTURE_SERVER) {
|
targets := val.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget)
|
if len(targets) > 0 {
|
scResult.CompareResult = append(scResult.CompareResult, targets...)
|
}
|
}
|
}
|
} else {
|
targets := val.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget)
|
if len(targets) > 0 {
|
scResult.CompareResult = append(scResult.CompareResult, targets...)
|
}
|
}
|
}
|
}
|
}
|
} else { //指定比对目标map
|
if !compareArgs.Source {
|
for _, tid := range compareArgs.TableIds { //ruleProcess比对指定底库
|
shardins, ok := Cmap.Cam[tid]
|
if !ok {
|
logger.Error("ruleProcess compare get shard error by tableId:", tid)
|
continue
|
}
|
if tShard, hasT := Cmap.Cam[PRE_DBTABLE]; hasT {
|
if tEnable, tOk := tShard.Get(tid); tOk { //存在此底库
|
logger.Debug("ruleProcess compare tables,exist tableId:", tid, ",enable:", tEnable)
|
if tEnable.(bool) { //底库有效
|
targets := shardins.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget)
|
if len(targets) > 0 {
|
scResult.CompareResult = append(scResult.CompareResult, targets...)
|
}
|
}
|
} else {
|
logger.Error("ruleProcess compare tables,tShard not exist tableId:", tid)
|
}
|
} else {
|
logger.Error("ruleProcess compare tables,PRE_DBTABLE tableId:", tid, " not exist")
|
}
|
}
|
} else { //web请求,比对指定的抓拍库或者底库
|
for _, tid := range compareArgs.TableIds {
|
shardins, ok := Cmap.Cam[tid]
|
if !ok {
|
logger.Error("get shard error by tableId:", tid)
|
continue
|
}
|
|
targets := shardins.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget)
|
if len(targets) > 0 {
|
scResult.CompareResult = append(scResult.CompareResult, targets...)
|
}
|
}
|
}
|
}
|
|
logger.Debug("比对结果: len(scResult.CompareResult):", len(scResult.CompareResult))
|
buf, err := proto.Marshal(&scResult)
|
|
if err != nil {
|
logger.Error("scResult Marshal error!", err)
|
return nil
|
}
|
|
return buf
|
}
|
|
func DoSdkCompare(ci []byte, co string) float32 {
|
co_d, err := base64.StdEncoding.DecodeString(co)
|
if err != nil {
|
logger.Error("DoSdkCompare err:", err)
|
return -1
|
}
|
sec := face.DecCompare(ci, co_d)
|
//logger.Debug("比对得分为:", sec)
|
|
sec = ParseScore(sec)
|
return sec
|
}
|
|
func ParseScore(compareScore float32) float32 {
|
if compareScore <= 1 {
|
compareScore = compareScore * 100
|
}
|
if compareScore == 100 {
|
return 100
|
}
|
f, _ := strconv.ParseFloat(fmt.Sprintf("%2.2f", compareScore), 32)
|
|
return float32(f)
|
}
|