From 6c38b30be9ff127f200ffbfe75c0dc48612f37a6 Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期五, 13 十二月 2024 15:58:14 +0800 Subject: [PATCH] update --- models/gather_model.go | 27 ++++++++++++++++----------- 1 files changed, 16 insertions(+), 11 deletions(-) diff --git a/models/gather_model.go b/models/gather_model.go index 59187ce..c384db2 100644 --- a/models/gather_model.go +++ b/models/gather_model.go @@ -13,6 +13,7 @@ "model-engine/pkg/set" "model-engine/service" "strings" + "sync" "time" ) @@ -72,20 +73,18 @@ } var ( - processed map[string]ProcessedRecord // 瀛樺偍宸插鐞嗚褰� + processed sync.Map // 瀛樺偍宸插鐞嗚褰� cleanupThreshold = time.Now().Add(-100 * time.Hour) // 瀹氫箟涓�涓椂闂寸獥鍙o紝鍋囪鍙繚瀛樻渶杩�100灏忔椂鐨勮褰� ) -func init() { - processed = make(map[string]ProcessedRecord) -} func (m *GatherModel) Run() error { // 娓呯悊杩囨湡鐨勮褰� - for key, record := range processed { - if record.Timestamp.Before(cleanupThreshold) { - delete(processed, key) + processed.Range(func(key, value any) bool { + if value.(ProcessedRecord).Timestamp.Before(cleanupThreshold) { + processed.Delete(key) } - } + return true + }) records, err := queryElasticsearch(db.GetEsClient(), m) if err != nil { @@ -104,16 +103,19 @@ uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate) // 濡傛灉宸茬粡澶勭悊杩囷紝璺宠繃 - if _, exists := processed[uniqueKey]; exists { + if _, exists := processed.Load(uniqueKey); exists { continue } // 娣诲姞鍒板凡澶勭悊璁板綍 - processed[uniqueKey] = ProcessedRecord{ + processed.Store(uniqueKey, ProcessedRecord{ UniqueKey: uniqueKey, Timestamp: time.Now(), - } + }) newRecords = append(newRecords, record) + } + if len(newRecords) == 0 { + return nil } aggregation, err := analyzeAndAggregate(newRecords) @@ -403,6 +405,9 @@ } func analyzeAndAggregate(records []*GatherRecord) (map[GatherLocationTime]set.StringSet, error) { + if len(records) == 0 { + return nil, nil + } aggregation := make(map[GatherLocationTime]set.StringSet) domainIds := set.NewStringSet() for _, record := range records { -- Gitblit v1.8.0