From 705590e7161e5ebc21654492ab79eb6a42873fb7 Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期五, 13 十二月 2024 11:53:36 +0800 Subject: [PATCH] 内存去重100小时内的 --- models/gather_model.go | 55 +++++++++++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 49 insertions(+), 6 deletions(-) diff --git a/models/gather_model.go b/models/gather_model.go index 5e13538..8a0c7ac 100644 --- a/models/gather_model.go +++ b/models/gather_model.go @@ -4,6 +4,7 @@ "bytes" "context" "encoding/json" + "errors" "fmt" "github.com/elastic/go-elasticsearch/v6" "log" @@ -29,8 +30,15 @@ Task *db.ModelTask } -func (m *GatherModel) Init(task *db.ModelTask) error { +type ProcessedRecord struct { + UniqueKey string // 鍞竴鏍囪瘑 + Timestamp time.Time // 璁板綍鐨勬椂闂存埑 +} +func (m *GatherModel) Init(task *db.ModelTask) error { + if len(task.DomainUnitIds) == 0 { + return errors.New("empty domain set") + } orgIds, areaIds, err := service.GetOrgIdsAndAreaIdsByDomainUnitIds(task.DomainUnitIds) if err != nil { return err @@ -63,7 +71,22 @@ AppearInterval int `gorm:"type:int;" json:"appearInterval"` //鍑虹幇闂撮殧锛屽崟浣嶄负绉� } +var ( + processed map[string]ProcessedRecord // 瀛樺偍宸插鐞嗚褰� + cleanupThreshold = time.Now().Add(-100 * time.Hour) // 瀹氫箟涓�涓椂闂寸獥鍙o紝鍋囪鍙繚瀛樻渶杩�100灏忔椂鐨勮褰� +) + +func init() { + processed = make(map[string]ProcessedRecord) +} func (m *GatherModel) Run() error { + // 娓呯悊杩囨湡鐨勮褰� + for key, record := range processed { + if record.Timestamp.Before(cleanupThreshold) { + delete(processed, key) + } + } + records, err := queryElasticsearch(db.GetEsClient(), m) if err != nil { log.Fatalf("Failed to query Elasticsearch: %v", err) @@ -73,7 +96,27 @@ return nil } - aggregation, err := analyzeAndAggregate(records) + newRecords := make([]*GatherRecord, 0) + + // 鑱氬悎閫昏緫 + for _, record := range records { + // 鐢熸垚鍞竴鏍囪瘑 + uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate) + + // 濡傛灉宸茬粡澶勭悊杩囷紝璺宠繃 + if _, exists := processed[uniqueKey]; exists { + continue + } + + // 娣诲姞鍒板凡澶勭悊璁板綍 + processed[uniqueKey] = ProcessedRecord{ + UniqueKey: uniqueKey, + Timestamp: time.Now(), + } + newRecords = append(newRecords, record) + } + + aggregation, err := analyzeAndAggregate(newRecords) if err != nil { log.Fatalf("Failed to analyze and aggregate data: %v", err) } @@ -114,7 +157,7 @@ return nil } -func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]GatherRecord, error) { +func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]*GatherRecord, error) { var buf bytes.Buffer now := time.Now() start := now.Add(-time.Duration(gatherModel.DaysWindow) * 24 * time.Hour) @@ -283,7 +326,7 @@ } // 瑙f瀽鑱氬悎缁撴灉 - var records []GatherRecord + var records []*GatherRecord if aggs, ok := result["aggregations"].(map[string]interface{}); ok { if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok { for _, orgBucket := range orgBuckets { @@ -316,7 +359,7 @@ documentNumber := person.(map[string]interface{})["key"].(string) // 鏋勫缓 GatherRecord 缁撴瀯浣� - record := GatherRecord{ + record := &GatherRecord{ PicDate: timestamp, DocumentNumber: documentNumber, CommunityId: communityId, @@ -354,7 +397,7 @@ Time string } -func analyzeAndAggregate(records []GatherRecord) (map[GatherLocationTime]set.StringSet, error) { +func analyzeAndAggregate(records []*GatherRecord) (map[GatherLocationTime]set.StringSet, error) { aggregation := make(map[GatherLocationTime]set.StringSet) domainIds := set.NewStringSet() for _, record := range records { -- Gitblit v1.8.0