zhangqian
2024-12-13 705590e7161e5ebc21654492ab79eb6a42873fb7
内存去重100小时内的
1个文件已修改
55 ■■■■ 已修改文件
models/gather_model.go 55 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
models/gather_model.go
@@ -4,6 +4,7 @@
    "bytes"
    "context"
    "encoding/json"
    "errors"
    "fmt"
    "github.com/elastic/go-elasticsearch/v6"
    "log"
@@ -29,8 +30,15 @@
    Task           *db.ModelTask
}
func (m *GatherModel) Init(task *db.ModelTask) error {
type ProcessedRecord struct {
    UniqueKey string    // 唯一标识
    Timestamp time.Time // 记录的时间戳
}
func (m *GatherModel) Init(task *db.ModelTask) error {
    if len(task.DomainUnitIds) == 0 {
        return errors.New("empty domain set")
    }
    orgIds, areaIds, err := service.GetOrgIdsAndAreaIdsByDomainUnitIds(task.DomainUnitIds)
    if err != nil {
        return err
@@ -63,7 +71,22 @@
    AppearInterval int    `gorm:"type:int;" json:"appearInterval"` //出现间隔,单位为秒
}
var (
    processed        map[string]ProcessedRecord         // 存储已处理记录
    cleanupThreshold = time.Now().Add(-100 * time.Hour) // 定义一个时间窗口,假设只保存最近100小时的记录
)
func init() {
    processed = make(map[string]ProcessedRecord)
}
func (m *GatherModel) Run() error {
    // 清理过期的记录
    for key, record := range processed {
        if record.Timestamp.Before(cleanupThreshold) {
            delete(processed, key)
        }
    }
    records, err := queryElasticsearch(db.GetEsClient(), m)
    if err != nil {
        log.Fatalf("Failed to query Elasticsearch: %v", err)
@@ -73,7 +96,27 @@
        return nil
    }
    aggregation, err := analyzeAndAggregate(records)
    newRecords := make([]*GatherRecord, 0)
    // 聚合逻辑
    for _, record := range records {
        // 生成唯一标识
        uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate)
        // 如果已经处理过,跳过
        if _, exists := processed[uniqueKey]; exists {
            continue
        }
        // 添加到已处理记录
        processed[uniqueKey] = ProcessedRecord{
            UniqueKey: uniqueKey,
            Timestamp: time.Now(),
        }
        newRecords = append(newRecords, record)
    }
    aggregation, err := analyzeAndAggregate(newRecords)
    if err != nil {
        log.Fatalf("Failed to analyze and aggregate data: %v", err)
    }
@@ -114,7 +157,7 @@
    return nil
}
func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]GatherRecord, error) {
func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]*GatherRecord, error) {
    var buf bytes.Buffer
    now := time.Now()
    start := now.Add(-time.Duration(gatherModel.DaysWindow) * 24 * time.Hour)
@@ -283,7 +326,7 @@
    }
    // 解析聚合结果
    var records []GatherRecord
    var records []*GatherRecord
    if aggs, ok := result["aggregations"].(map[string]interface{}); ok {
        if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok {
            for _, orgBucket := range orgBuckets {
@@ -316,7 +359,7 @@
                                                        documentNumber := person.(map[string]interface{})["key"].(string)
                                                        // 构建 GatherRecord 结构体
                                                        record := GatherRecord{
                                                        record := &GatherRecord{
                                                            PicDate:        timestamp,
                                                            DocumentNumber: documentNumber,
                                                            CommunityId:    communityId,
@@ -354,7 +397,7 @@
    Time        string
}
func analyzeAndAggregate(records []GatherRecord) (map[GatherLocationTime]set.StringSet, error) {
func analyzeAndAggregate(records []*GatherRecord) (map[GatherLocationTime]set.StringSet, error) {
    aggregation := make(map[GatherLocationTime]set.StringSet)
    domainIds := set.NewStringSet()
    for _, record := range records {