From ccd7a3175cd160275955636673b5c4c43d9691a3 Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期五, 13 十二月 2024 15:53:58 +0800
Subject: [PATCH] 使用sync.map防止并发读写冲突
---
models/gather_model.go | 21 ++++++++++-----------
1 files changed, 10 insertions(+), 11 deletions(-)
diff --git a/models/gather_model.go b/models/gather_model.go
index 59187ce..fd429ab 100644
--- a/models/gather_model.go
+++ b/models/gather_model.go
@@ -13,6 +13,7 @@
"model-engine/pkg/set"
"model-engine/service"
"strings"
+ "sync"
"time"
)
@@ -72,20 +73,18 @@
}
var (
- processed map[string]ProcessedRecord // 瀛樺偍宸插鐞嗚褰�
+ processed sync.Map // 瀛樺偍宸插鐞嗚褰�
cleanupThreshold = time.Now().Add(-100 * time.Hour) // 瀹氫箟涓�涓椂闂寸獥鍙o紝鍋囪鍙繚瀛樻渶杩�100灏忔椂鐨勮褰�
)
-func init() {
- processed = make(map[string]ProcessedRecord)
-}
func (m *GatherModel) Run() error {
// 娓呯悊杩囨湡鐨勮褰�
- for key, record := range processed {
- if record.Timestamp.Before(cleanupThreshold) {
- delete(processed, key)
+ processed.Range(func(key, value any) bool {
+ if value.(ProcessedRecord).Timestamp.Before(cleanupThreshold) {
+ processed.Delete(key)
}
- }
+ return true
+ })
records, err := queryElasticsearch(db.GetEsClient(), m)
if err != nil {
@@ -104,15 +103,15 @@
uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate)
// 濡傛灉宸茬粡澶勭悊杩囷紝璺宠繃
- if _, exists := processed[uniqueKey]; exists {
+ if _, exists := processed.Load(uniqueKey); exists {
continue
}
// 娣诲姞鍒板凡澶勭悊璁板綍
- processed[uniqueKey] = ProcessedRecord{
+ processed.Store(uniqueKey, ProcessedRecord{
UniqueKey: uniqueKey,
Timestamp: time.Now(),
- }
+ })
newRecords = append(newRecords, record)
}
--
Gitblit v1.8.0