zhangzengfei
2024-12-09 366e2ff546092d9be26411fb698b3ddd8e834a11
cache/cache.go
@@ -1,6 +1,7 @@
package cache
import (
   "basic.com/pubsub/protomsg.git"
   "flag"
   "sync"
   "time"
@@ -8,16 +9,15 @@
   "sdkCompare/cache/shardmap"
   "sdkCompare/db"
   "basic.com/pubsub/protomsg.git"
   "basic.com/valib/logger.git"
)
var querynum = flag.Int("querynum", 3000, "the query number from database")
var threadnum = flag.Int("threadnum", 32, "the number of thread to deal data.")
var peerSizeNum = flag.Int("peerSize", 3000, "the query number from database")
var threadNum = flag.Int("threadNum", 32, "the number of thread to deal data.")
var (
   Unfiled     = "unfiled"
   PRE_DBTABLE = "dbTable_"
   Unfilled     = "unfilled"
   PreTableName = "dbTable_"
)
type AreaMapItem struct {
@@ -26,8 +26,7 @@
}
var CaptureDbMap *AreaMapItem
var RealNameDbMap = shardmap.New(uint8(*threadnum))
var KeyPersonDbMap = shardmap.New(uint8(*threadnum))
var RealNameDbMap = shardmap.New(uint8(*threadNum))
var doOnce sync.Once
@@ -40,12 +39,11 @@
      }
      // 初始化未分类, 没有小区id的档案
      CaptureDbMap.Area[Unfiled] = shardmap.New(uint8(*threadnum))
      CaptureDbMap.Area[Unfilled] = shardmap.New(uint8(*threadNum))
   })
   initDbTablePersonsCache()
   initRealNamePersonsCache()
   initKeyPersonsCache()
}
func ReInitDbTablePersonsCache() {
@@ -66,7 +64,7 @@
func initDbTablePersonsCache() {
   // 缓存底库中的全部人员信息
   var dbpApi db.DbPersons
   total, e := dbpApi.GetPersonTotal("")
   total, e := dbpApi.GetPersonTotal()
   // 暂时去掉到访小区过滤
   //var psApi db.PersonStatus
@@ -74,16 +72,18 @@
   logger.Debugf("抓拍档案库共有%d条记录", total)
   if e == nil && total > 0 {
      queryEachNum := *querynum
      qn := int(total) / *threadnum
      if *querynum < qn {
      queryEachNum := *peerSizeNum
      qn := int(total) / *threadNum
      if *peerSizeNum < qn {
         queryEachNum = qn
      }
      queryT := int(total) / queryEachNum
      if int(total)%queryEachNum > 0 {
         queryT++
      }
      temptime := time.Now()
      startTime := time.Now()
      var wg sync.WaitGroup
      for i := 0; i < queryT; i++ {
@@ -91,12 +91,13 @@
         wg.Add(1)
         go func(qs int) {
            defer wg.Done()
            dbPersons, err := dbpApi.GetPersonsCompareCacheBase(j, queryEachNum)
            dbPersons, err := dbpApi.GetPersonsCacheBase(j, queryEachNum)
            if err != nil {
               logger.Error(err)
               return
            }
            logger.Debugf("eachNum:%d, 获取%d条人员信息", queryEachNum, len(dbPersons))
            logger.Debugf("thread:%d, 获取%d条人员信息", queryEachNum, len(dbPersons))
            CaptureDbMap.Lock()
            areaId := ""
@@ -104,14 +105,14 @@
               areaId = value.AreaId
               // 没有小区id的人员
               if areaId == "" {
                  CaptureDbMap.Area[Unfiled].Set(value.Id, value)
                  CaptureDbMap.Area[Unfiled].Settime()
                  CaptureDbMap.Area[Unfilled].Set(value.Id, value)
                  CaptureDbMap.Area[Unfilled].Settime()
                  continue
               }
               //for _, areaId := range accessAreas[value.Id] {
               if _, ok := CaptureDbMap.Area[areaId]; !ok {
                  CaptureDbMap.Area[areaId] = shardmap.New(uint8(*threadnum))
                  CaptureDbMap.Area[areaId] = shardmap.New(uint8(*threadNum))
               }
               CaptureDbMap.Area[areaId].Set(value.Id, value)
@@ -124,16 +125,17 @@
         }(j)
      }
      wg.Wait()
      logger.Debug("抓拍档案库人员缓存完成用时:", time.Since(temptime))
      logger.Debug("抓拍档案库人员缓存完成用时:", time.Since(startTime))
      for k, v := range CaptureDbMap.Area {
         logger.Debugf("Cache area %s item len %d ", k, v.GetLen())
         logger.Debugf("Cache area %s items len %d ", k, v.GetLen())
      }
   }
}
func initRealNamePersonsCache() {
   var dbApi db.Layouts
   var dbApi db.RealNamePerson
   dbPersons, err := dbApi.GetRealNamePersonList()
   if err != nil {
      logger.Error("init real-name persons error,", err.Error())
@@ -146,24 +148,10 @@
   logger.Debugf("常住人口共有%d条记录", len(dbPersons))
}
func initKeyPersonsCache() {
   var dbApi db.Layouts
   dbPersons, err := dbApi.GetKeyPersonList()
   if err != nil {
      logger.Error("init real-name persons error,", err.Error())
   }
   for _, value := range dbPersons {
      KeyPersonDbMap.Set(value.Id, value)
   }
   logger.Debugf("重点人员共有%d条记录", len(dbPersons))
}
// UpdateDbPersonsCacheById 更新缓存中的全部人员信息
func UpdateDbPersonsCacheById(id string) {
   var dbpApi db.DbPersons
   info, err := dbpApi.GetPersonsCompareCacheById(id)
   info, err := dbpApi.GetPersonsById(id)
   if err != nil {
      logger.Error(err)
      return
@@ -172,7 +160,7 @@
      if _, ok := CaptureDbMap.Area[info.AreaId]; !ok {
         CaptureDbMap.Lock()
         defer CaptureDbMap.Unlock()
         CaptureDbMap.Area[info.AreaId] = shardmap.New(uint8(*threadnum))
         CaptureDbMap.Area[info.AreaId] = shardmap.New(uint8(*threadNum))
      }
      CaptureDbMap.Area[info.AreaId].Set(info.Id, info)
      CaptureDbMap.Area[info.AreaId].Settime()
@@ -189,7 +177,7 @@
   CaptureDbMap.Lock()
   defer CaptureDbMap.Unlock()
   if _, ok := CaptureDbMap.Area[tableId]; !ok {
      CaptureDbMap.Area[tableId] = shardmap.New(uint8(*threadnum))
      CaptureDbMap.Area[tableId] = shardmap.New(uint8(*threadNum))
   }
   var ei = protomsg.Esinfo{
      Id:          id,
@@ -203,7 +191,7 @@
}
func RealTimeDelPersonFromCache(tableId string, id string) {
   logger.Debug("DelPersonFromCache,tableId:", tableId, ",id:", id)
   logger.Debug("Delete person from cache, tableId:", tableId, ",id:", id)
   CaptureDbMap.Lock()
   defer CaptureDbMap.Unlock()
   if _, ok := CaptureDbMap.Area[tableId]; ok {
@@ -219,7 +207,7 @@
   CaptureDbMap.Lock()
   defer CaptureDbMap.Unlock()
   if dtM, ok := CaptureDbMap.Area[PRE_DBTABLE]; ok {
   if dtM, ok := CaptureDbMap.Area[PreTableName]; ok {
      dtM.Del(tableId)
   }
   if _, ok := CaptureDbMap.Area[tableId]; ok {
@@ -233,10 +221,10 @@
   CaptureDbMap.Lock()
   defer CaptureDbMap.Unlock()
   if _, ok := CaptureDbMap.Area[PRE_DBTABLE]; !ok {
      CaptureDbMap.Area[PRE_DBTABLE] = shardmap.New(uint8(*threadnum))
   if _, ok := CaptureDbMap.Area[PreTableName]; !ok {
      CaptureDbMap.Area[PreTableName] = shardmap.New(uint8(*threadNum))
   }
   CaptureDbMap.Area[PRE_DBTABLE].Set(tableId, enable == 1)
   CaptureDbMap.Area[PreTableName].Set(tableId, enable == 1)
}
func UpdateCache(changeMsg *protomsg.EsPersonCacheChange) {