From 705590e7161e5ebc21654492ab79eb6a42873fb7 Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期五, 13 十二月 2024 11:53:36 +0800
Subject: [PATCH] 内存去重100小时内的
---
models/gather_model.go | 55 +++++++++++++++++++++++++++++++++++++++++++++++++------
1 files changed, 49 insertions(+), 6 deletions(-)
diff --git a/models/gather_model.go b/models/gather_model.go
index 5e13538..8a0c7ac 100644
--- a/models/gather_model.go
+++ b/models/gather_model.go
@@ -4,6 +4,7 @@
"bytes"
"context"
"encoding/json"
+ "errors"
"fmt"
"github.com/elastic/go-elasticsearch/v6"
"log"
@@ -29,8 +30,15 @@
Task *db.ModelTask
}
-func (m *GatherModel) Init(task *db.ModelTask) error {
+type ProcessedRecord struct {
+ UniqueKey string // 鍞竴鏍囪瘑
+ Timestamp time.Time // 璁板綍鐨勬椂闂存埑
+}
+func (m *GatherModel) Init(task *db.ModelTask) error {
+ if len(task.DomainUnitIds) == 0 {
+ return errors.New("empty domain set")
+ }
orgIds, areaIds, err := service.GetOrgIdsAndAreaIdsByDomainUnitIds(task.DomainUnitIds)
if err != nil {
return err
@@ -63,7 +71,22 @@
AppearInterval int `gorm:"type:int;" json:"appearInterval"` //鍑虹幇闂撮殧锛屽崟浣嶄负绉�
}
+var (
+ processed map[string]ProcessedRecord // 瀛樺偍宸插鐞嗚褰�
+ cleanupThreshold = time.Now().Add(-100 * time.Hour) // 瀹氫箟涓�涓椂闂寸獥鍙o紝鍋囪鍙繚瀛樻渶杩�100灏忔椂鐨勮褰�
+)
+
+func init() {
+ processed = make(map[string]ProcessedRecord)
+}
func (m *GatherModel) Run() error {
+ // 娓呯悊杩囨湡鐨勮褰�
+ for key, record := range processed {
+ if record.Timestamp.Before(cleanupThreshold) {
+ delete(processed, key)
+ }
+ }
+
records, err := queryElasticsearch(db.GetEsClient(), m)
if err != nil {
log.Fatalf("Failed to query Elasticsearch: %v", err)
@@ -73,7 +96,27 @@
return nil
}
- aggregation, err := analyzeAndAggregate(records)
+ newRecords := make([]*GatherRecord, 0)
+
+ // 鑱氬悎閫昏緫
+ for _, record := range records {
+ // 鐢熸垚鍞竴鏍囪瘑
+ uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate)
+
+ // 濡傛灉宸茬粡澶勭悊杩囷紝璺宠繃
+ if _, exists := processed[uniqueKey]; exists {
+ continue
+ }
+
+ // 娣诲姞鍒板凡澶勭悊璁板綍
+ processed[uniqueKey] = ProcessedRecord{
+ UniqueKey: uniqueKey,
+ Timestamp: time.Now(),
+ }
+ newRecords = append(newRecords, record)
+ }
+
+ aggregation, err := analyzeAndAggregate(newRecords)
if err != nil {
log.Fatalf("Failed to analyze and aggregate data: %v", err)
}
@@ -114,7 +157,7 @@
return nil
}
-func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]GatherRecord, error) {
+func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]*GatherRecord, error) {
var buf bytes.Buffer
now := time.Now()
start := now.Add(-time.Duration(gatherModel.DaysWindow) * 24 * time.Hour)
@@ -283,7 +326,7 @@
}
// 瑙f瀽鑱氬悎缁撴灉
- var records []GatherRecord
+ var records []*GatherRecord
if aggs, ok := result["aggregations"].(map[string]interface{}); ok {
if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok {
for _, orgBucket := range orgBuckets {
@@ -316,7 +359,7 @@
documentNumber := person.(map[string]interface{})["key"].(string)
// 鏋勫缓 GatherRecord 缁撴瀯浣�
- record := GatherRecord{
+ record := &GatherRecord{
PicDate: timestamp,
DocumentNumber: documentNumber,
CommunityId: communityId,
@@ -354,7 +397,7 @@
Time string
}
-func analyzeAndAggregate(records []GatherRecord) (map[GatherLocationTime]set.StringSet, error) {
+func analyzeAndAggregate(records []*GatherRecord) (map[GatherLocationTime]set.StringSet, error) {
aggregation := make(map[GatherLocationTime]set.StringSet)
domainIds := set.NewStringSet()
for _, record := range records {
--
Gitblit v1.8.0