package cache import ( "flag" "sync" "time" "sdkCompare/cache/shardmap" "sdkCompare/db" "basic.com/pubsub/protomsg.git" "basic.com/valib/logger.git" ) var querynum = flag.Int("querynum", 3000, "the query number from database") var threadnum = flag.Int("threadnum", 32, "the number of thread to deal data.") var ( Unfiled = "unfiled" PRE_DBTABLE = "dbTable_" ) type CmapItem struct { sync.Mutex Area map[string]*shardmap.ShardMap } var CacheMap *CmapItem var doOnce sync.Once func ReInitDbTablePersonsCache() { CacheMap.Lock() defer CacheMap.Unlock() if CacheMap == nil { CacheMap = &CmapItem{ Area: make(map[string]*shardmap.ShardMap), } } for tableId, _ := range CacheMap.Area { delete(CacheMap.Area, tableId) } initDbTablePersonsCache() } func InitDbTablePersons() { doOnce.Do(func() { flag.Parse() CacheMap = &CmapItem{ Area: make(map[string]*shardmap.ShardMap), } // 初始化未分类, 没有小区id的档案 CacheMap.Area[Unfiled] = shardmap.New(uint8(*threadnum)) }) initDbTablePersonsCache() } func initDbTablePersonsCache() { // 缓存底库中的全部人员信息 var dbpApi db.DbPersons total, e := dbpApi.GetPersonTotal("") var psApi db.PersonStatus accessAreas, _ := psApi.GetPersonAccessedAreas() logger.Debugf("所有底库共有%d条记录", total) if e == nil && total > 0 { queryEachNum := *querynum qn := int(total) / *threadnum if *querynum < qn { queryEachNum = qn } queryT := int(total) / queryEachNum if int(total)%queryEachNum > 0 { queryT++ } temptime := time.Now() var wg sync.WaitGroup for i := 0; i < queryT; i++ { j := i * queryEachNum wg.Add(1) go func(qs int) { defer wg.Done() dbPersons, err := dbpApi.GetPersonsCompareCacheBase(j, queryEachNum) if err != nil { logger.Error(err) return } logger.Debugf(" eachNum:%d, 获取%d条人员信息", queryEachNum, len(dbPersons)) CacheMap.Lock() areaId := "" for _, value := range dbPersons { areaId = value.AreaId // 没有小区id的人员 if areaId == "" { CacheMap.Area[Unfiled].Set(value.Id, value) CacheMap.Area[Unfiled].Settime() continue } for _, areaId := range accessAreas[value.Id] { if _, ok := CacheMap.Area[areaId]; !ok { CacheMap.Area[areaId] = shardmap.New(uint8(*threadnum)) } CacheMap.Area[areaId].Set(value.Id, value) CacheMap.Area[areaId].Settime() } } CacheMap.Unlock() }(j) } wg.Wait() logger.Debug("底库人员缓存完成用时:", time.Since(temptime)) } } // UpdateDbPersonsCacheById 更新缓存中的全部人员信息 func UpdateDbPersonsCacheById(id string) { var dbpApi db.DbPersons info, err := dbpApi.GetPersonsCompareCacheById(id) if err != nil { logger.Error(err) return } if info != nil && info.AreaId != "" { CacheMap.Lock() defer CacheMap.Unlock() if _, ok := CacheMap.Area[info.AreaId]; !ok { CacheMap.Area[info.AreaId] = shardmap.New(uint8(*threadnum)) } CacheMap.Area[info.AreaId].Set(info.Id, info) CacheMap.Area[info.AreaId].Settime() } } func RealTimeAddPersonInfoToCache(tableId string, id string, faceFeature string, enable int32, carNo string) { CacheMap.Lock() defer CacheMap.Unlock() if _, ok := CacheMap.Area[tableId]; !ok { CacheMap.Area[tableId] = shardmap.New(uint8(*threadnum)) } var ei = protomsg.Esinfo{ Id: id, Tableid: tableId, FaceFeature: faceFeature, Enable: enable, CarNo: carNo, } CacheMap.Area[tableId].Set(id, &ei) logger.Debug("id:", id, ",tableId:", ",len(faceFeature):", len(faceFeature), ",tableId:", tableId, ",enable:", enable) } func RealTimeDelPersonFromCache(tableId string, id string) { logger.Debug("DelPersonFromCache,tableId:", tableId, ",id:", id) CacheMap.Lock() defer CacheMap.Unlock() if _, ok := CacheMap.Area[tableId]; ok { CacheMap.Area[tableId].Del(id) logger.Debug("DelPerson ok success") } else { logger.Error("tableId:", tableId, " not exist") } } func RealTimeDelTable(tableId string) { logger.Debug("RealTimeDelTable tableId:", tableId) CacheMap.Lock() defer CacheMap.Unlock() if dtM, ok := CacheMap.Area[PRE_DBTABLE]; ok { dtM.Del(tableId) } if _, ok := CacheMap.Area[tableId]; ok { delete(CacheMap.Area, tableId) } } // 使底库生效,将底库中的所有生效状态的人特征添加到缓存 func RealTimeUpdateTable(tableId string, enable int32) { logger.Debug("RealTimeUpdateTable tableId:", tableId, ",enable:", enable) CacheMap.Lock() defer CacheMap.Unlock() if _, ok := CacheMap.Area[PRE_DBTABLE]; !ok { CacheMap.Area[PRE_DBTABLE] = shardmap.New(uint8(*threadnum)) } CacheMap.Area[PRE_DBTABLE].Set(tableId, enable == 1) } func UpdateCache(changeMsg *protomsg.EsPersonCacheChange) { if changeMsg.Type == protomsg.EsCacheChanged_T_DbTable { if changeMsg.Action == protomsg.DbAction_Insert || changeMsg.Action == protomsg.DbAction_Update { RealTimeUpdateTable(changeMsg.TableId[0], changeMsg.Enable) } else if changeMsg.Action == protomsg.DbAction_Delete { RealTimeDelTable(changeMsg.TableId[0]) } } else if changeMsg.Type == protomsg.EsCacheChanged_T_DbTablePerson { if changeMsg.Action == protomsg.DbAction_Insert || changeMsg.Action == protomsg.DbAction_Update { RealTimeAddPersonInfoToCache(changeMsg.TableId[0], changeMsg.PersonId, changeMsg.Feature, changeMsg.Enable, changeMsg.CarNo) } else if changeMsg.Action == protomsg.DbAction_Delete { RealTimeDelPersonFromCache(changeMsg.TableId[0], changeMsg.PersonId) } } }