From 6c38b30be9ff127f200ffbfe75c0dc48612f37a6 Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期五, 13 十二月 2024 15:58:14 +0800
Subject: [PATCH] update

---
 models/gather_model.go |   54 ++++++++++++++++++++++++++++++++----------------------
 1 files changed, 32 insertions(+), 22 deletions(-)

diff --git a/models/gather_model.go b/models/gather_model.go
index 050065d..c384db2 100644
--- a/models/gather_model.go
+++ b/models/gather_model.go
@@ -13,6 +13,7 @@
 	"model-engine/pkg/set"
 	"model-engine/service"
 	"strings"
+	"sync"
 	"time"
 )
 
@@ -72,20 +73,18 @@
 }
 
 var (
-	processed        map[string]ProcessedRecord         // 瀛樺偍宸插鐞嗚褰�
+	processed        sync.Map                           // 瀛樺偍宸插鐞嗚褰�
 	cleanupThreshold = time.Now().Add(-100 * time.Hour) // 瀹氫箟涓�涓椂闂寸獥鍙o紝鍋囪鍙繚瀛樻渶杩�100灏忔椂鐨勮褰�
 )
 
-func init() {
-	processed = make(map[string]ProcessedRecord)
-}
 func (m *GatherModel) Run() error {
 	// 娓呯悊杩囨湡鐨勮褰�
-	for key, record := range processed {
-		if record.Timestamp.Before(cleanupThreshold) {
-			delete(processed, key)
+	processed.Range(func(key, value any) bool {
+		if value.(ProcessedRecord).Timestamp.Before(cleanupThreshold) {
+			processed.Delete(key)
 		}
-	}
+		return true
+	})
 
 	records, err := queryElasticsearch(db.GetEsClient(), m)
 	if err != nil {
@@ -104,16 +103,19 @@
 		uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate)
 
 		// 濡傛灉宸茬粡澶勭悊杩囷紝璺宠繃
-		if _, exists := processed[uniqueKey]; exists {
+		if _, exists := processed.Load(uniqueKey); exists {
 			continue
 		}
 
 		// 娣诲姞鍒板凡澶勭悊璁板綍
-		processed[uniqueKey] = ProcessedRecord{
+		processed.Store(uniqueKey, ProcessedRecord{
 			UniqueKey: uniqueKey,
 			Timestamp: time.Now(),
-		}
+		})
 		newRecords = append(newRecords, record)
+	}
+	if len(newRecords) == 0 {
+		return nil
 	}
 
 	aggregation, err := analyzeAndAggregate(newRecords)
@@ -133,18 +135,23 @@
 	}
 	event := strings.Join(typeNames, ",")
 	for lt, persons := range aggregation {
+		if persons.Size() == 0 {
+			continue
+		}
+		personIds := persons.Elements()
 		result := &db.ModelTaskResults{
-			Title:       m.Task.Name,
-			Event:       fmt.Sprintf("%s/%d浜�", event, len(persons)),
-			ModelID:     m.Task.ModelID,
-			ModelTaskID: m.Task.ID,
-			CommunityId: lt.CommunityId,
-			OrgID:       lt.OrgId,
-			ObjectIds:   strings.Join(persons.Elements(), ","),
-			Location:    lt.Location,
-			Building:    lt.Building,
-			Floor:       lt.Floor,
-			PicDate:     lt.Time,
+			Title:         m.Task.Name,
+			Event:         fmt.Sprintf("%s/%d浜�", event, len(persons)),
+			ModelID:       m.Task.ModelID,
+			ModelTaskID:   m.Task.ID,
+			CommunityId:   lt.CommunityId,
+			OrgID:         lt.OrgId,
+			ObjectIds:     strings.Join(personIds, ","),
+			Location:      lt.Location,
+			Building:      lt.Building,
+			Floor:         lt.Floor,
+			PicDate:       lt.Time,
+			FirstPersonID: personIds[0],
 		}
 		results = append(results, result)
 	}
@@ -398,6 +405,9 @@
 }
 
 func analyzeAndAggregate(records []*GatherRecord) (map[GatherLocationTime]set.StringSet, error) {
+	if len(records) == 0 {
+		return nil, nil
+	}
 	aggregation := make(map[GatherLocationTime]set.StringSet)
 	domainIds := set.NewStringSet()
 	for _, record := range records {

--
Gitblit v1.8.0