| | |
| | | package cache |
| | | |
| | | import ( |
| | | "basic.com/pubsub/protomsg.git" |
| | | "flag" |
| | | "sync" |
| | | "time" |
| | |
| | | "sdkCompare/cache/shardmap" |
| | | "sdkCompare/db" |
| | | |
| | | "basic.com/pubsub/protomsg.git" |
| | | "basic.com/valib/logger.git" |
| | | ) |
| | | |
| | | var querynum = flag.Int("querynum", 3000, "the query number from database") |
| | | var threadnum = flag.Int("threadnum", 32, "the number of thread to deal data.") |
| | | var peerSizeNum = flag.Int("peerSize", 3000, "the query number from database") |
| | | var threadNum = flag.Int("threadNum", 32, "the number of thread to deal data.") |
| | | |
| | | var ( |
| | | Unfiled = "unfiled" |
| | | PRE_DBTABLE = "dbTable_" |
| | | Unfilled = "unfilled" |
| | | PreTableName = "dbTable_" |
| | | ) |
| | | |
| | | type AreaMapItem struct { |
| | |
| | | } |
| | | |
| | | var CaptureDbMap *AreaMapItem |
| | | var RealNameDbMap = shardmap.New(uint8(*threadnum)) |
| | | var KeyPersonDbMap = shardmap.New(uint8(*threadnum)) |
| | | var RealNameDbMap = shardmap.New(uint8(*threadNum)) |
| | | |
| | | var doOnce sync.Once |
| | | |
| | |
| | | } |
| | | |
| | | // 初始化未分类, 没有小区id的档案 |
| | | CaptureDbMap.Area[Unfiled] = shardmap.New(uint8(*threadnum)) |
| | | CaptureDbMap.Area[Unfilled] = shardmap.New(uint8(*threadNum)) |
| | | }) |
| | | |
| | | initDbTablePersonsCache() |
| | | initRealNamePersonsCache() |
| | | initKeyPersonsCache() |
| | | } |
| | | |
| | | func ReInitDbTablePersonsCache() { |
| | |
| | | func initDbTablePersonsCache() { |
| | | // 缓存底库中的全部人员信息 |
| | | var dbpApi db.DbPersons |
| | | total, e := dbpApi.GetPersonTotal("") |
| | | total, e := dbpApi.GetPersonTotal() |
| | | |
| | | // 暂时去掉到访小区过滤 |
| | | //var psApi db.PersonStatus |
| | |
| | | |
| | | logger.Debugf("抓拍档案库共有%d条记录", total) |
| | | if e == nil && total > 0 { |
| | | queryEachNum := *querynum |
| | | qn := int(total) / *threadnum |
| | | if *querynum < qn { |
| | | queryEachNum := *peerSizeNum |
| | | qn := int(total) / *threadNum |
| | | if *peerSizeNum < qn { |
| | | queryEachNum = qn |
| | | } |
| | | |
| | | queryT := int(total) / queryEachNum |
| | | if int(total)%queryEachNum > 0 { |
| | | queryT++ |
| | | } |
| | | temptime := time.Now() |
| | | |
| | | startTime := time.Now() |
| | | var wg sync.WaitGroup |
| | | |
| | | for i := 0; i < queryT; i++ { |
| | |
| | | wg.Add(1) |
| | | go func(qs int) { |
| | | defer wg.Done() |
| | | dbPersons, err := dbpApi.GetPersonsCompareCacheBase(j, queryEachNum) |
| | | dbPersons, err := dbpApi.GetPersonsCacheBase(j, queryEachNum) |
| | | if err != nil { |
| | | logger.Error(err) |
| | | return |
| | | } |
| | | logger.Debugf("eachNum:%d, 获取%d条人员信息", queryEachNum, len(dbPersons)) |
| | | |
| | | logger.Debugf("thread:%d, 获取%d条人员信息", queryEachNum, len(dbPersons)) |
| | | CaptureDbMap.Lock() |
| | | |
| | | areaId := "" |
| | |
| | | areaId = value.AreaId |
| | | // 没有小区id的人员 |
| | | if areaId == "" { |
| | | CaptureDbMap.Area[Unfiled].Set(value.Id, value) |
| | | CaptureDbMap.Area[Unfiled].Settime() |
| | | CaptureDbMap.Area[Unfilled].Set(value.Id, value) |
| | | CaptureDbMap.Area[Unfilled].Settime() |
| | | continue |
| | | } |
| | | |
| | | //for _, areaId := range accessAreas[value.Id] { |
| | | if _, ok := CaptureDbMap.Area[areaId]; !ok { |
| | | CaptureDbMap.Area[areaId] = shardmap.New(uint8(*threadnum)) |
| | | CaptureDbMap.Area[areaId] = shardmap.New(uint8(*threadNum)) |
| | | } |
| | | |
| | | CaptureDbMap.Area[areaId].Set(value.Id, value) |
| | |
| | | }(j) |
| | | } |
| | | wg.Wait() |
| | | logger.Debug("抓拍档案库人员缓存完成用时:", time.Since(temptime)) |
| | | |
| | | logger.Debug("抓拍档案库人员缓存完成用时:", time.Since(startTime)) |
| | | |
| | | for k, v := range CaptureDbMap.Area { |
| | | logger.Debugf("Cache area %s item len %d ", k, v.GetLen()) |
| | | logger.Debugf("Cache area %s items len %d ", k, v.GetLen()) |
| | | } |
| | | } |
| | | } |
| | | |
| | | func initRealNamePersonsCache() { |
| | | var dbApi db.Layouts |
| | | var dbApi db.RealNamePerson |
| | | dbPersons, err := dbApi.GetRealNamePersonList() |
| | | if err != nil { |
| | | logger.Error("init real-name persons error,", err.Error()) |
| | |
| | | logger.Debugf("常住人口共有%d条记录", len(dbPersons)) |
| | | } |
| | | |
| | | func initKeyPersonsCache() { |
| | | var dbApi db.Layouts |
| | | dbPersons, err := dbApi.GetKeyPersonList() |
| | | if err != nil { |
| | | logger.Error("init real-name persons error,", err.Error()) |
| | | } |
| | | |
| | | for _, value := range dbPersons { |
| | | KeyPersonDbMap.Set(value.Id, value) |
| | | } |
| | | |
| | | logger.Debugf("重点人员共有%d条记录", len(dbPersons)) |
| | | } |
| | | |
| | | // UpdateDbPersonsCacheById 更新缓存中的全部人员信息 |
| | | func UpdateDbPersonsCacheById(id string) { |
| | | var dbpApi db.DbPersons |
| | | info, err := dbpApi.GetPersonsCompareCacheById(id) |
| | | info, err := dbpApi.GetPersonsById(id) |
| | | if err != nil { |
| | | logger.Error(err) |
| | | return |
| | |
| | | if _, ok := CaptureDbMap.Area[info.AreaId]; !ok { |
| | | CaptureDbMap.Lock() |
| | | defer CaptureDbMap.Unlock() |
| | | CaptureDbMap.Area[info.AreaId] = shardmap.New(uint8(*threadnum)) |
| | | CaptureDbMap.Area[info.AreaId] = shardmap.New(uint8(*threadNum)) |
| | | } |
| | | CaptureDbMap.Area[info.AreaId].Set(info.Id, info) |
| | | CaptureDbMap.Area[info.AreaId].Settime() |
| | |
| | | CaptureDbMap.Lock() |
| | | defer CaptureDbMap.Unlock() |
| | | if _, ok := CaptureDbMap.Area[tableId]; !ok { |
| | | CaptureDbMap.Area[tableId] = shardmap.New(uint8(*threadnum)) |
| | | CaptureDbMap.Area[tableId] = shardmap.New(uint8(*threadNum)) |
| | | } |
| | | var ei = protomsg.Esinfo{ |
| | | Id: id, |
| | |
| | | } |
| | | |
| | | func RealTimeDelPersonFromCache(tableId string, id string) { |
| | | logger.Debug("DelPersonFromCache,tableId:", tableId, ",id:", id) |
| | | logger.Debug("Delete person from cache, tableId:", tableId, ",id:", id) |
| | | CaptureDbMap.Lock() |
| | | defer CaptureDbMap.Unlock() |
| | | if _, ok := CaptureDbMap.Area[tableId]; ok { |
| | |
| | | CaptureDbMap.Lock() |
| | | defer CaptureDbMap.Unlock() |
| | | |
| | | if dtM, ok := CaptureDbMap.Area[PRE_DBTABLE]; ok { |
| | | if dtM, ok := CaptureDbMap.Area[PreTableName]; ok { |
| | | dtM.Del(tableId) |
| | | } |
| | | if _, ok := CaptureDbMap.Area[tableId]; ok { |
| | |
| | | CaptureDbMap.Lock() |
| | | defer CaptureDbMap.Unlock() |
| | | |
| | | if _, ok := CaptureDbMap.Area[PRE_DBTABLE]; !ok { |
| | | CaptureDbMap.Area[PRE_DBTABLE] = shardmap.New(uint8(*threadnum)) |
| | | if _, ok := CaptureDbMap.Area[PreTableName]; !ok { |
| | | CaptureDbMap.Area[PreTableName] = shardmap.New(uint8(*threadNum)) |
| | | } |
| | | CaptureDbMap.Area[PRE_DBTABLE].Set(tableId, enable == 1) |
| | | CaptureDbMap.Area[PreTableName].Set(tableId, enable == 1) |
| | | } |
| | | |
| | | func UpdateCache(changeMsg *protomsg.EsPersonCacheChange) { |