From 6c38b30be9ff127f200ffbfe75c0dc48612f37a6 Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期五, 13 十二月 2024 15:58:14 +0800 Subject: [PATCH] update --- models/gather_model.go | 54 ++++++++++++++++++++++++++++++++---------------------- 1 files changed, 32 insertions(+), 22 deletions(-) diff --git a/models/gather_model.go b/models/gather_model.go index 050065d..c384db2 100644 --- a/models/gather_model.go +++ b/models/gather_model.go @@ -13,6 +13,7 @@ "model-engine/pkg/set" "model-engine/service" "strings" + "sync" "time" ) @@ -72,20 +73,18 @@ } var ( - processed map[string]ProcessedRecord // 瀛樺偍宸插鐞嗚褰� + processed sync.Map // 瀛樺偍宸插鐞嗚褰� cleanupThreshold = time.Now().Add(-100 * time.Hour) // 瀹氫箟涓�涓椂闂寸獥鍙o紝鍋囪鍙繚瀛樻渶杩�100灏忔椂鐨勮褰� ) -func init() { - processed = make(map[string]ProcessedRecord) -} func (m *GatherModel) Run() error { // 娓呯悊杩囨湡鐨勮褰� - for key, record := range processed { - if record.Timestamp.Before(cleanupThreshold) { - delete(processed, key) + processed.Range(func(key, value any) bool { + if value.(ProcessedRecord).Timestamp.Before(cleanupThreshold) { + processed.Delete(key) } - } + return true + }) records, err := queryElasticsearch(db.GetEsClient(), m) if err != nil { @@ -104,16 +103,19 @@ uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate) // 濡傛灉宸茬粡澶勭悊杩囷紝璺宠繃 - if _, exists := processed[uniqueKey]; exists { + if _, exists := processed.Load(uniqueKey); exists { continue } // 娣诲姞鍒板凡澶勭悊璁板綍 - processed[uniqueKey] = ProcessedRecord{ + processed.Store(uniqueKey, ProcessedRecord{ UniqueKey: uniqueKey, Timestamp: time.Now(), - } + }) newRecords = append(newRecords, record) + } + if len(newRecords) == 0 { + return nil } aggregation, err := analyzeAndAggregate(newRecords) @@ -133,18 +135,23 @@ } event := strings.Join(typeNames, ",") for lt, persons := range aggregation { + if persons.Size() == 0 { + continue + } + personIds := persons.Elements() result := &db.ModelTaskResults{ - Title: m.Task.Name, - Event: fmt.Sprintf("%s/%d浜�", event, len(persons)), - ModelID: m.Task.ModelID, - ModelTaskID: m.Task.ID, - CommunityId: lt.CommunityId, - OrgID: lt.OrgId, - ObjectIds: strings.Join(persons.Elements(), ","), - Location: lt.Location, - Building: lt.Building, - Floor: lt.Floor, - PicDate: lt.Time, + Title: m.Task.Name, + Event: fmt.Sprintf("%s/%d浜�", event, len(persons)), + ModelID: m.Task.ModelID, + ModelTaskID: m.Task.ID, + CommunityId: lt.CommunityId, + OrgID: lt.OrgId, + ObjectIds: strings.Join(personIds, ","), + Location: lt.Location, + Building: lt.Building, + Floor: lt.Floor, + PicDate: lt.Time, + FirstPersonID: personIds[0], } results = append(results, result) } @@ -398,6 +405,9 @@ } func analyzeAndAggregate(records []*GatherRecord) (map[GatherLocationTime]set.StringSet, error) { + if len(records) == 0 { + return nil, nil + } aggregation := make(map[GatherLocationTime]set.StringSet) domainIds := set.NewStringSet() for _, record := range records { -- Gitblit v1.8.0