From ccd7a3175cd160275955636673b5c4c43d9691a3 Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期五, 13 十二月 2024 15:53:58 +0800 Subject: [PATCH] 使用sync.map防止并发读写冲突 --- models/gather_model.go | 21 ++++++++++----------- 1 files changed, 10 insertions(+), 11 deletions(-) diff --git a/models/gather_model.go b/models/gather_model.go index 59187ce..fd429ab 100644 --- a/models/gather_model.go +++ b/models/gather_model.go @@ -13,6 +13,7 @@ "model-engine/pkg/set" "model-engine/service" "strings" + "sync" "time" ) @@ -72,20 +73,18 @@ } var ( - processed map[string]ProcessedRecord // 瀛樺偍宸插鐞嗚褰� + processed sync.Map // 瀛樺偍宸插鐞嗚褰� cleanupThreshold = time.Now().Add(-100 * time.Hour) // 瀹氫箟涓�涓椂闂寸獥鍙o紝鍋囪鍙繚瀛樻渶杩�100灏忔椂鐨勮褰� ) -func init() { - processed = make(map[string]ProcessedRecord) -} func (m *GatherModel) Run() error { // 娓呯悊杩囨湡鐨勮褰� - for key, record := range processed { - if record.Timestamp.Before(cleanupThreshold) { - delete(processed, key) + processed.Range(func(key, value any) bool { + if value.(ProcessedRecord).Timestamp.Before(cleanupThreshold) { + processed.Delete(key) } - } + return true + }) records, err := queryElasticsearch(db.GetEsClient(), m) if err != nil { @@ -104,15 +103,15 @@ uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate) // 濡傛灉宸茬粡澶勭悊杩囷紝璺宠繃 - if _, exists := processed[uniqueKey]; exists { + if _, exists := processed.Load(uniqueKey); exists { continue } // 娣诲姞鍒板凡澶勭悊璁板綍 - processed[uniqueKey] = ProcessedRecord{ + processed.Store(uniqueKey, ProcessedRecord{ UniqueKey: uniqueKey, Timestamp: time.Now(), - } + }) newRecords = append(newRecords, record) } -- Gitblit v1.8.0