package cache
|
|
import (
|
"flag"
|
"sync"
|
"time"
|
|
"sdkCompare/cache/shardmap"
|
"sdkCompare/db"
|
|
"basic.com/pubsub/protomsg.git"
|
"basic.com/valib/logger.git"
|
)
|
|
var peerSizeNum = flag.Int("peerSize", 3000, "the query number from database")
|
var threadNum = flag.Int("threadNum", 32, "the number of thread to deal data.")
|
|
var (
|
Unfilled = "unfilled"
|
PreTableName = "dbTable_"
|
)
|
|
type AreaMapItem struct {
|
sync.Mutex
|
Area map[string]*shardmap.ShardMap
|
}
|
|
var CaptureDbMap *AreaMapItem
|
var RealNameDbMap = shardmap.New(uint8(*threadNum))
|
var KeyPersonDbMap = shardmap.New(uint8(*threadNum))
|
|
var doOnce sync.Once
|
|
func InitCache() {
|
doOnce.Do(func() {
|
flag.Parse()
|
|
CaptureDbMap = &AreaMapItem{
|
Area: make(map[string]*shardmap.ShardMap),
|
}
|
|
// 初始化未分类, 没有小区id的档案
|
CaptureDbMap.Area[Unfilled] = shardmap.New(uint8(*threadNum))
|
})
|
|
initDbTablePersonsCache()
|
initRealNamePersonsCache()
|
initKeyPersonsCache()
|
}
|
|
func ReInitDbTablePersonsCache() {
|
CaptureDbMap.Lock()
|
defer CaptureDbMap.Unlock()
|
|
if CaptureDbMap == nil {
|
CaptureDbMap = &AreaMapItem{
|
Area: make(map[string]*shardmap.ShardMap),
|
}
|
}
|
for tableId, _ := range CaptureDbMap.Area {
|
delete(CaptureDbMap.Area, tableId)
|
}
|
|
initDbTablePersonsCache()
|
}
|
func initDbTablePersonsCache() {
|
// 缓存底库中的全部人员信息
|
var dbpApi db.DbPersons
|
total, e := dbpApi.GetPersonTotal()
|
|
// 暂时去掉到访小区过滤
|
//var psApi db.PersonStatus
|
//accessAreas, _ := psApi.GetPersonAccessedAreas()
|
|
logger.Debugf("抓拍档案库共有%d条记录", total)
|
if e == nil && total > 0 {
|
queryEachNum := *peerSizeNum
|
qn := int(total) / *threadNum
|
if *peerSizeNum < qn {
|
queryEachNum = qn
|
}
|
|
queryT := int(total) / queryEachNum
|
if int(total)%queryEachNum > 0 {
|
queryT++
|
}
|
|
startTime := time.Now()
|
var wg sync.WaitGroup
|
|
for i := 0; i < queryT; i++ {
|
j := i * queryEachNum
|
wg.Add(1)
|
go func(qs int) {
|
defer wg.Done()
|
dbPersons, err := dbpApi.GetPersonsCacheBase(j, queryEachNum)
|
if err != nil {
|
logger.Error(err)
|
return
|
}
|
|
logger.Debugf("thread:%d, 获取%d条人员信息", queryEachNum, len(dbPersons))
|
CaptureDbMap.Lock()
|
|
areaId := ""
|
for _, value := range dbPersons {
|
areaId = value.AreaId
|
// 没有小区id的人员
|
if areaId == "" {
|
CaptureDbMap.Area[Unfilled].Set(value.Id, value)
|
CaptureDbMap.Area[Unfilled].Settime()
|
continue
|
}
|
|
//for _, areaId := range accessAreas[value.Id] {
|
if _, ok := CaptureDbMap.Area[areaId]; !ok {
|
CaptureDbMap.Area[areaId] = shardmap.New(uint8(*threadNum))
|
}
|
|
CaptureDbMap.Area[areaId].Set(value.Id, value)
|
CaptureDbMap.Area[areaId].Settime()
|
//}
|
}
|
|
CaptureDbMap.Unlock()
|
|
}(j)
|
}
|
wg.Wait()
|
|
logger.Debug("抓拍档案库人员缓存完成用时:", time.Since(startTime))
|
|
for k, v := range CaptureDbMap.Area {
|
logger.Debugf("Cache area %s items len %d ", k, v.GetLen())
|
}
|
}
|
}
|
|
func initRealNamePersonsCache() {
|
var dbApi db.Layouts
|
dbPersons, err := dbApi.GetRealNamePersonList()
|
if err != nil {
|
logger.Error("init real-name persons error,", err.Error())
|
}
|
|
for _, value := range dbPersons {
|
RealNameDbMap.Set(value.Id, value)
|
}
|
|
logger.Debugf("常住人口共有%d条记录", len(dbPersons))
|
}
|
|
func initKeyPersonsCache() {
|
var dbApi db.Layouts
|
dbPersons, err := dbApi.GetKeyPersonList()
|
if err != nil {
|
logger.Error("init real-name persons error,", err.Error())
|
}
|
|
for _, value := range dbPersons {
|
KeyPersonDbMap.Set(value.Id, value)
|
}
|
|
logger.Debugf("重点人员共有%d条记录", len(dbPersons))
|
}
|
|
// UpdateDbPersonsCacheById 更新缓存中的全部人员信息
|
func UpdateDbPersonsCacheById(id string) {
|
var dbpApi db.DbPersons
|
info, err := dbpApi.GetPersonsById(id)
|
if err != nil {
|
logger.Error(err)
|
return
|
}
|
if info != nil && info.AreaId != "" {
|
if _, ok := CaptureDbMap.Area[info.AreaId]; !ok {
|
CaptureDbMap.Lock()
|
defer CaptureDbMap.Unlock()
|
CaptureDbMap.Area[info.AreaId] = shardmap.New(uint8(*threadNum))
|
}
|
CaptureDbMap.Area[info.AreaId].Set(info.Id, info)
|
CaptureDbMap.Area[info.AreaId].Settime()
|
}
|
}
|
|
func DeleteDbPersonsCacheById(id string) {
|
for key, _ := range CaptureDbMap.Area {
|
CaptureDbMap.Area[key].Del(id)
|
}
|
}
|
|
func RealTimeAddPersonInfoToCache(tableId string, id string, faceFeature string, enable int32, carNo string) {
|
CaptureDbMap.Lock()
|
defer CaptureDbMap.Unlock()
|
if _, ok := CaptureDbMap.Area[tableId]; !ok {
|
CaptureDbMap.Area[tableId] = shardmap.New(uint8(*threadNum))
|
}
|
var ei = protomsg.Esinfo{
|
Id: id,
|
Tableid: tableId,
|
FaceFeature: faceFeature,
|
Enable: enable,
|
CarNo: carNo,
|
}
|
CaptureDbMap.Area[tableId].Set(id, &ei)
|
logger.Debug("id:", id, ",tableId:", ",len(faceFeature):", len(faceFeature), ",tableId:", tableId, ",enable:", enable)
|
}
|
|
func RealTimeDelPersonFromCache(tableId string, id string) {
|
logger.Debug("Delete person from cache, tableId:", tableId, ",id:", id)
|
CaptureDbMap.Lock()
|
defer CaptureDbMap.Unlock()
|
if _, ok := CaptureDbMap.Area[tableId]; ok {
|
CaptureDbMap.Area[tableId].Del(id)
|
logger.Debug("DelPerson ok success")
|
} else {
|
logger.Error("tableId:", tableId, " not exist")
|
}
|
}
|
|
func RealTimeDelTable(tableId string) {
|
logger.Debug("RealTimeDelTable tableId:", tableId)
|
CaptureDbMap.Lock()
|
defer CaptureDbMap.Unlock()
|
|
if dtM, ok := CaptureDbMap.Area[PreTableName]; ok {
|
dtM.Del(tableId)
|
}
|
if _, ok := CaptureDbMap.Area[tableId]; ok {
|
delete(CaptureDbMap.Area, tableId)
|
}
|
}
|
|
// 使底库生效,将底库中的所有生效状态的人特征添加到缓存
|
func RealTimeUpdateTable(tableId string, enable int32) {
|
logger.Debug("RealTimeUpdateTable tableId:", tableId, ",enable:", enable)
|
CaptureDbMap.Lock()
|
defer CaptureDbMap.Unlock()
|
|
if _, ok := CaptureDbMap.Area[PreTableName]; !ok {
|
CaptureDbMap.Area[PreTableName] = shardmap.New(uint8(*threadNum))
|
}
|
CaptureDbMap.Area[PreTableName].Set(tableId, enable == 1)
|
}
|
|
func UpdateCache(changeMsg *protomsg.EsPersonCacheChange) {
|
if changeMsg.Type == protomsg.EsCacheChanged_T_DbTable {
|
if changeMsg.Action == protomsg.DbAction_Insert || changeMsg.Action == protomsg.DbAction_Update {
|
RealTimeUpdateTable(changeMsg.TableId[0], changeMsg.Enable)
|
} else if changeMsg.Action == protomsg.DbAction_Delete {
|
RealTimeDelTable(changeMsg.TableId[0])
|
}
|
} else if changeMsg.Type == protomsg.EsCacheChanged_T_DbTablePerson {
|
if changeMsg.Action == protomsg.DbAction_Insert || changeMsg.Action == protomsg.DbAction_Update {
|
RealTimeAddPersonInfoToCache(changeMsg.TableId[0], changeMsg.PersonId, changeMsg.Feature, changeMsg.Enable, changeMsg.CarNo)
|
} else if changeMsg.Action == protomsg.DbAction_Delete {
|
RealTimeDelPersonFromCache(changeMsg.TableId[0], changeMsg.PersonId)
|
}
|
}
|
}
|