zhangqian
2024-12-13 ccd7a3175cd160275955636673b5c4c43d9691a3
使用sync.map防止并发读写冲突
1个文件已修改
21 ■■■■ 已修改文件
models/gather_model.go 21 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
models/gather_model.go
@@ -13,6 +13,7 @@
    "model-engine/pkg/set"
    "model-engine/service"
    "strings"
    "sync"
    "time"
)
@@ -72,20 +73,18 @@
}
var (
    processed        map[string]ProcessedRecord         // 存储已处理记录
    processed        sync.Map                           // 存储已处理记录
    cleanupThreshold = time.Now().Add(-100 * time.Hour) // 定义一个时间窗口,假设只保存最近100小时的记录
)
func init() {
    processed = make(map[string]ProcessedRecord)
}
func (m *GatherModel) Run() error {
    // 清理过期的记录
    for key, record := range processed {
        if record.Timestamp.Before(cleanupThreshold) {
            delete(processed, key)
    processed.Range(func(key, value any) bool {
        if value.(ProcessedRecord).Timestamp.Before(cleanupThreshold) {
            processed.Delete(key)
        }
    }
        return true
    })
    records, err := queryElasticsearch(db.GetEsClient(), m)
    if err != nil {
@@ -104,15 +103,15 @@
        uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate)
        // 如果已经处理过,跳过
        if _, exists := processed[uniqueKey]; exists {
        if _, exists := processed.Load(uniqueKey); exists {
            continue
        }
        // 添加到已处理记录
        processed[uniqueKey] = ProcessedRecord{
        processed.Store(uniqueKey, ProcessedRecord{
            UniqueKey: uniqueKey,
            Timestamp: time.Now(),
        }
        })
        newRecords = append(newRecords, record)
    }