liujiandao
2024-02-20 07eea46970759aba106f3db3f4bc24c518ab41de
库中新增更新单条缓存
3个文件已修改
82 ■■■■■ 已修改文件
cache/compare.go 62 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
cache/db.go 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
cache/compare.go
@@ -266,56 +266,22 @@
    }
}
// UpdateDbPersonsCache 更新缓存中的全部人员信息
func UpdateDbPersonsCache() {
// UpdateDbPersonsCacheById 更新缓存中的全部人员信息
func UpdateDbPersonsCacheById(id string) {
    var dbpApi DbPersons
    total, e := dbpApi.GetPersonTotal("")
    logger.Debugf("所有底库共有%d条记录", total)
    if e == nil && total > 0 {
        queryEachNum := *querynum
        qn := int(total) / *threadnum
        if *querynum < qn {
            queryEachNum = qn
    info, err := dbpApi.GetPersonsCompareCacheById(id)
    if err != nil {
        logger.Debug(err)
        return
    }
    if info.Tableid != "" {
        Cmap.Lock()
        defer Cmap.Unlock()
        if _, ok := Cmap.Cam[info.Tableid]; !ok {
            Cmap.Cam[info.Tableid] = shardmap.New(uint8(*threadnum))
        }
        queryT := int(total) / queryEachNum
        if int(total)%queryEachNum > 0 {
            queryT++
        }
        temptime := time.Now()
        var wg sync.WaitGroup
        for i := 0; i < queryT; i++ {
            j := i * queryEachNum
            wg.Add(1)
            go func(qs int) {
                defer wg.Done()
                dbpersons, err := dbpApi.GetPersonsCompareCacheBase(j, queryEachNum)
                if err != nil {
                    logger.Debug(err)
                    return
                }
                logger.Debugf("获取%d条人员信息", len(dbpersons))
                Cmap.Lock()
                tableId := ""
                for _, value := range dbpersons {
                    tableId = value.Tableid
                    if _, ok := Cmap.Cam[value.Tableid]; !ok {
                        Cmap.Cam[value.Tableid] = shardmap.New(uint8(*threadnum))
                    }
                    Cmap.Cam[value.Tableid].Set(value.Id, value)
                }
                if len(dbpersons) != 0 {
                    Cmap.Cam[tableId].Settime()
                }
                Cmap.Unlock()
            }(j)
        }
        wg.Wait()
        logger.Debug("底库人员缓存完成用时:", time.Since(temptime))
        Cmap.Cam[info.Tableid].Set(info.Id, info)
        Cmap.Cam[info.Tableid].Settime()
    }
}
cache/db.go
@@ -126,3 +126,21 @@
    }
    return
}
func (dbp *DbPersons) GetPersonsCompareCacheById(id string) (info *protomsg.Esinfo, err error) {
    sql := "select id,faceFeature,tableId,enable from dbtablepersons where id = \"" + id + "\""
    var p DbPersons
    err = db.Raw(sql).First(&p).Error
    if err != nil {
        return nil, err
    }
    if p.FaceFeature != "" {
        info = &protomsg.Esinfo{
            Id:          p.Id,
            Tableid:     p.TableId,
            FaceFeature: p.FaceFeature,
            Enable:      int32(p.Enable),
        }
    }
    return
}
main.go
@@ -131,7 +131,7 @@
                    if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //加入集群后重新初始化缓存
                        cache.ReInitDbTablePersonsCache()
                    } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //库中新增更新缓存
                        cache.UpdateDbPersonsCache()
                        cache.UpdateDbPersonsCacheById(string(compareEvent.Payload))
                    }
                } else {
                    logger.Debug("json unmarshal error")