package cache import ( "flag" "sync" "time" "sdkCompare/cache/shardmap" "sdkCompare/db" "basic.com/pubsub/protomsg.git" "basic.com/valib/logger.git" ) var querynum = flag.Int("querynum", 3000, "the query number from database") var threadnum = flag.Int("threadnum", 32, "the number of thread to deal data.") var ( Unfiled = "unfiled" PRE_DBTABLE = "dbTable_" ) type AreaMapItem struct { sync.Mutex Area map[string]*shardmap.ShardMap } var CaptureDbMap *AreaMapItem var RealNameDbMap = shardmap.New(uint8(*threadnum)) var KeyPersonDbMap = shardmap.New(uint8(*threadnum)) var doOnce sync.Once func InitCache() { doOnce.Do(func() { flag.Parse() CaptureDbMap = &AreaMapItem{ Area: make(map[string]*shardmap.ShardMap), } // 初始化未分类, 没有小区id的档案 CaptureDbMap.Area[Unfiled] = shardmap.New(uint8(*threadnum)) }) initDbTablePersonsCache() initRealNamePersonsCache() initKeyPersonsCache() } func ReInitDbTablePersonsCache() { CaptureDbMap.Lock() defer CaptureDbMap.Unlock() if CaptureDbMap == nil { CaptureDbMap = &AreaMapItem{ Area: make(map[string]*shardmap.ShardMap), } } for tableId, _ := range CaptureDbMap.Area { delete(CaptureDbMap.Area, tableId) } initDbTablePersonsCache() } func initDbTablePersonsCache() { // 缓存底库中的全部人员信息 var dbpApi db.DbPersons total, e := dbpApi.GetPersonTotal("") // 暂时去掉到访小区过滤 //var psApi db.PersonStatus //accessAreas, _ := psApi.GetPersonAccessedAreas() logger.Debugf("抓拍档案库共有%d条记录", total) if e == nil && total > 0 { queryEachNum := *querynum qn := int(total) / *threadnum if *querynum < qn { queryEachNum = qn } queryT := int(total) / queryEachNum if int(total)%queryEachNum > 0 { queryT++ } temptime := time.Now() var wg sync.WaitGroup for i := 0; i < queryT; i++ { j := i * queryEachNum wg.Add(1) go func(qs int) { defer wg.Done() dbPersons, err := dbpApi.GetPersonsCompareCacheBase(j, queryEachNum) if err != nil { logger.Error(err) return } logger.Debugf("eachNum:%d, 获取%d条人员信息", queryEachNum, len(dbPersons)) CaptureDbMap.Lock() areaId := "" for _, value := range dbPersons { areaId = value.AreaId // 没有小区id的人员 if areaId == "" { CaptureDbMap.Area[Unfiled].Set(value.Id, value) CaptureDbMap.Area[Unfiled].Settime() continue } //for _, areaId := range accessAreas[value.Id] { if _, ok := CaptureDbMap.Area[areaId]; !ok { CaptureDbMap.Area[areaId] = shardmap.New(uint8(*threadnum)) } CaptureDbMap.Area[areaId].Set(value.Id, value) CaptureDbMap.Area[areaId].Settime() //} } CaptureDbMap.Unlock() }(j) } wg.Wait() logger.Debug("抓拍档案库人员缓存完成用时:", time.Since(temptime)) for k, v := range CaptureDbMap.Area { logger.Debugf("Cache area %s item len %d ", k, v.GetLen()) } } } func initRealNamePersonsCache() { var dbApi db.Layouts dbPersons, err := dbApi.GetRealNamePersonList() if err != nil { logger.Error("init real-name persons error,", err.Error()) } for _, value := range dbPersons { RealNameDbMap.Set(value.Id, value) } logger.Debugf("常住人口共有%d条记录", len(dbPersons)) } func initKeyPersonsCache() { var dbApi db.Layouts dbPersons, err := dbApi.GetKeyPersonList() if err != nil { logger.Error("init real-name persons error,", err.Error()) } for _, value := range dbPersons { KeyPersonDbMap.Set(value.Id, value) } logger.Debugf("重点人员共有%d条记录", len(dbPersons)) } // UpdateDbPersonsCacheById 更新缓存中的全部人员信息 func UpdateDbPersonsCacheById(id string) { var dbpApi db.DbPersons info, err := dbpApi.GetPersonsCompareCacheById(id) if err != nil { logger.Error(err) return } if info != nil && info.AreaId != "" { if _, ok := CaptureDbMap.Area[info.AreaId]; !ok { CaptureDbMap.Lock() defer CaptureDbMap.Unlock() CaptureDbMap.Area[info.AreaId] = shardmap.New(uint8(*threadnum)) } CaptureDbMap.Area[info.AreaId].Set(info.Id, info) CaptureDbMap.Area[info.AreaId].Settime() } } func DeleteDbPersonsCacheById(id string) { for key, _ := range CaptureDbMap.Area { CaptureDbMap.Area[key].Del(id) } } func RealTimeAddPersonInfoToCache(tableId string, id string, faceFeature string, enable int32, carNo string) { CaptureDbMap.Lock() defer CaptureDbMap.Unlock() if _, ok := CaptureDbMap.Area[tableId]; !ok { CaptureDbMap.Area[tableId] = shardmap.New(uint8(*threadnum)) } var ei = protomsg.Esinfo{ Id: id, Tableid: tableId, FaceFeature: faceFeature, Enable: enable, CarNo: carNo, } CaptureDbMap.Area[tableId].Set(id, &ei) logger.Debug("id:", id, ",tableId:", ",len(faceFeature):", len(faceFeature), ",tableId:", tableId, ",enable:", enable) } func RealTimeDelPersonFromCache(tableId string, id string) { logger.Debug("DelPersonFromCache,tableId:", tableId, ",id:", id) CaptureDbMap.Lock() defer CaptureDbMap.Unlock() if _, ok := CaptureDbMap.Area[tableId]; ok { CaptureDbMap.Area[tableId].Del(id) logger.Debug("DelPerson ok success") } else { logger.Error("tableId:", tableId, " not exist") } } func RealTimeDelTable(tableId string) { logger.Debug("RealTimeDelTable tableId:", tableId) CaptureDbMap.Lock() defer CaptureDbMap.Unlock() if dtM, ok := CaptureDbMap.Area[PRE_DBTABLE]; ok { dtM.Del(tableId) } if _, ok := CaptureDbMap.Area[tableId]; ok { delete(CaptureDbMap.Area, tableId) } } // 使底库生效,将底库中的所有生效状态的人特征添加到缓存 func RealTimeUpdateTable(tableId string, enable int32) { logger.Debug("RealTimeUpdateTable tableId:", tableId, ",enable:", enable) CaptureDbMap.Lock() defer CaptureDbMap.Unlock() if _, ok := CaptureDbMap.Area[PRE_DBTABLE]; !ok { CaptureDbMap.Area[PRE_DBTABLE] = shardmap.New(uint8(*threadnum)) } CaptureDbMap.Area[PRE_DBTABLE].Set(tableId, enable == 1) } func UpdateCache(changeMsg *protomsg.EsPersonCacheChange) { if changeMsg.Type == protomsg.EsCacheChanged_T_DbTable { if changeMsg.Action == protomsg.DbAction_Insert || changeMsg.Action == protomsg.DbAction_Update { RealTimeUpdateTable(changeMsg.TableId[0], changeMsg.Enable) } else if changeMsg.Action == protomsg.DbAction_Delete { RealTimeDelTable(changeMsg.TableId[0]) } } else if changeMsg.Type == protomsg.EsCacheChanged_T_DbTablePerson { if changeMsg.Action == protomsg.DbAction_Insert || changeMsg.Action == protomsg.DbAction_Update { RealTimeAddPersonInfoToCache(changeMsg.TableId[0], changeMsg.PersonId, changeMsg.Feature, changeMsg.Enable, changeMsg.CarNo) } else if changeMsg.Action == protomsg.DbAction_Delete { RealTimeDelPersonFromCache(changeMsg.TableId[0], changeMsg.PersonId) } } }