package cache
|
|
import (
|
"flag"
|
"sync"
|
"time"
|
|
"sdkCompare/cache/shardmap"
|
"sdkCompare/db"
|
|
"basic.com/pubsub/protomsg.git"
|
"basic.com/valib/logger.git"
|
)
|
|
var querynum = flag.Int("querynum", 3000, "the query number from database")
|
var threadnum = flag.Int("threadnum", 32, "the number of thread to deal data.")
|
|
var (
|
Unfiled = "unfiled"
|
PRE_DBTABLE = "dbTable_"
|
)
|
|
type CmapItem struct {
|
sync.Mutex
|
Area map[string]*shardmap.ShardMap
|
}
|
|
var CacheMap *CmapItem
|
var doOnce sync.Once
|
|
func ReInitDbTablePersonsCache() {
|
CacheMap.Lock()
|
defer CacheMap.Unlock()
|
if CacheMap == nil {
|
CacheMap = &CmapItem{
|
Area: make(map[string]*shardmap.ShardMap),
|
}
|
}
|
for tableId, _ := range CacheMap.Area {
|
delete(CacheMap.Area, tableId)
|
}
|
|
initDbTablePersonsCache()
|
}
|
|
func InitDbTablePersons() {
|
doOnce.Do(func() {
|
flag.Parse()
|
|
CacheMap = &CmapItem{
|
Area: make(map[string]*shardmap.ShardMap),
|
}
|
|
// 初始化未分类, 没有小区id的档案
|
CacheMap.Area[Unfiled] = shardmap.New(uint8(*threadnum))
|
})
|
|
initDbTablePersonsCache()
|
}
|
|
func initDbTablePersonsCache() {
|
// 缓存底库中的全部人员信息
|
var dbpApi db.DbPersons
|
total, e := dbpApi.GetPersonTotal("")
|
|
var psApi db.PersonStatus
|
accessAreas, _ := psApi.GetPersonAccessedAreas()
|
|
logger.Debugf("所有底库共有%d条记录", total)
|
if e == nil && total > 0 {
|
queryEachNum := *querynum
|
qn := int(total) / *threadnum
|
if *querynum < qn {
|
queryEachNum = qn
|
}
|
queryT := int(total) / queryEachNum
|
if int(total)%queryEachNum > 0 {
|
queryT++
|
}
|
temptime := time.Now()
|
var wg sync.WaitGroup
|
|
for i := 0; i < queryT; i++ {
|
j := i * queryEachNum
|
wg.Add(1)
|
go func(qs int) {
|
defer wg.Done()
|
dbPersons, err := dbpApi.GetPersonsCompareCacheBase(j, queryEachNum)
|
if err != nil {
|
logger.Error(err)
|
return
|
}
|
logger.Debugf("获取%d条人员信息", len(dbPersons))
|
CacheMap.Lock()
|
|
areaId := ""
|
for _, value := range dbPersons {
|
areaId = value.AreaId
|
// 没有小区id的人员
|
if areaId == "" {
|
CacheMap.Area[Unfiled].Set(value.Id, value)
|
CacheMap.Area[Unfiled].Settime()
|
continue
|
}
|
|
for _, areaId := range accessAreas[value.Id] {
|
if _, ok := CacheMap.Area[areaId]; !ok {
|
CacheMap.Area[areaId] = shardmap.New(uint8(*threadnum))
|
}
|
|
CacheMap.Area[areaId].Set(value.Id, value)
|
CacheMap.Area[areaId].Settime()
|
}
|
}
|
|
CacheMap.Unlock()
|
|
}(j)
|
}
|
wg.Wait()
|
logger.Debug("底库人员缓存完成用时:", time.Since(temptime))
|
}
|
}
|
|
// UpdateDbPersonsCacheById 更新缓存中的全部人员信息
|
func UpdateDbPersonsCacheById(id string) {
|
var dbpApi db.DbPersons
|
info, err := dbpApi.GetPersonsCompareCacheById(id)
|
if err != nil {
|
logger.Error(err)
|
return
|
}
|
if info.AreaId != "" {
|
CacheMap.Lock()
|
defer CacheMap.Unlock()
|
if _, ok := CacheMap.Area[info.AreaId]; !ok {
|
CacheMap.Area[info.AreaId] = shardmap.New(uint8(*threadnum))
|
}
|
CacheMap.Area[info.AreaId].Set(info.Id, info)
|
CacheMap.Area[info.AreaId].Settime()
|
}
|
}
|
|
func RealTimeAddPersonInfoToCache(tableId string, id string, faceFeature string, enable int32, carNo string) {
|
CacheMap.Lock()
|
defer CacheMap.Unlock()
|
if _, ok := CacheMap.Area[tableId]; !ok {
|
CacheMap.Area[tableId] = shardmap.New(uint8(*threadnum))
|
}
|
var ei = protomsg.Esinfo{
|
Id: id,
|
Tableid: tableId,
|
FaceFeature: faceFeature,
|
Enable: enable,
|
CarNo: carNo,
|
}
|
CacheMap.Area[tableId].Set(id, &ei)
|
logger.Debug("id:", id, ",tableId:", ",len(faceFeature):", len(faceFeature), ",tableId:", tableId, ",enable:", enable)
|
}
|
|
func RealTimeDelPersonFromCache(tableId string, id string) {
|
logger.Debug("DelPersonFromCache,tableId:", tableId, ",id:", id)
|
CacheMap.Lock()
|
defer CacheMap.Unlock()
|
if _, ok := CacheMap.Area[tableId]; ok {
|
CacheMap.Area[tableId].Del(id)
|
logger.Debug("DelPerson ok success")
|
} else {
|
logger.Error("tableId:", tableId, " not exist")
|
}
|
}
|
|
func RealTimeDelTable(tableId string) {
|
logger.Debug("RealTimeDelTable tableId:", tableId)
|
CacheMap.Lock()
|
defer CacheMap.Unlock()
|
|
if dtM, ok := CacheMap.Area[PRE_DBTABLE]; ok {
|
dtM.Del(tableId)
|
}
|
if _, ok := CacheMap.Area[tableId]; ok {
|
delete(CacheMap.Area, tableId)
|
}
|
}
|
|
// 使底库生效,将底库中的所有生效状态的人特征添加到缓存
|
func RealTimeUpdateTable(tableId string, enable int32) {
|
logger.Debug("RealTimeUpdateTable tableId:", tableId, ",enable:", enable)
|
CacheMap.Lock()
|
defer CacheMap.Unlock()
|
|
if _, ok := CacheMap.Area[PRE_DBTABLE]; !ok {
|
CacheMap.Area[PRE_DBTABLE] = shardmap.New(uint8(*threadnum))
|
}
|
CacheMap.Area[PRE_DBTABLE].Set(tableId, enable == 1)
|
}
|
|
func UpdateCache(changeMsg *protomsg.EsPersonCacheChange) {
|
if changeMsg.Type == protomsg.EsCacheChanged_T_DbTable {
|
if changeMsg.Action == protomsg.DbAction_Insert || changeMsg.Action == protomsg.DbAction_Update {
|
RealTimeUpdateTable(changeMsg.TableId[0], changeMsg.Enable)
|
} else if changeMsg.Action == protomsg.DbAction_Delete {
|
RealTimeDelTable(changeMsg.TableId[0])
|
}
|
} else if changeMsg.Type == protomsg.EsCacheChanged_T_DbTablePerson {
|
if changeMsg.Action == protomsg.DbAction_Insert || changeMsg.Action == protomsg.DbAction_Update {
|
RealTimeAddPersonInfoToCache(changeMsg.TableId[0], changeMsg.PersonId, changeMsg.Feature, changeMsg.Enable, changeMsg.CarNo)
|
} else if changeMsg.Action == protomsg.DbAction_Delete {
|
RealTimeDelPersonFromCache(changeMsg.TableId[0], changeMsg.PersonId)
|
}
|
}
|
}
|