| | |
| | | "bytes" |
| | | "context" |
| | | "encoding/json" |
| | | "errors" |
| | | "fmt" |
| | | "github.com/elastic/go-elasticsearch/v6" |
| | | "log" |
| | |
| | | Task *db.ModelTask |
| | | } |
| | | |
| | | func (m *GatherModel) Init(task *db.ModelTask) error { |
| | | type ProcessedRecord struct { |
| | | UniqueKey string // 唯一标识 |
| | | Timestamp time.Time // 记录的时间戳 |
| | | } |
| | | |
| | | func (m *GatherModel) Init(task *db.ModelTask) error { |
| | | if len(task.DomainUnitIds) == 0 { |
| | | return errors.New("empty domain set") |
| | | } |
| | | orgIds, areaIds, err := service.GetOrgIdsAndAreaIdsByDomainUnitIds(task.DomainUnitIds) |
| | | if err != nil { |
| | | return err |
| | |
| | | AppearInterval int `gorm:"type:int;" json:"appearInterval"` //出现间隔,单位为秒 |
| | | } |
| | | |
| | | var ( |
| | | processed map[string]ProcessedRecord // 存储已处理记录 |
| | | cleanupThreshold = time.Now().Add(-100 * time.Hour) // 定义一个时间窗口,假设只保存最近100小时的记录 |
| | | ) |
| | | |
| | | func init() { |
| | | processed = make(map[string]ProcessedRecord) |
| | | } |
| | | func (m *GatherModel) Run() error { |
| | | // 清理过期的记录 |
| | | for key, record := range processed { |
| | | if record.Timestamp.Before(cleanupThreshold) { |
| | | delete(processed, key) |
| | | } |
| | | } |
| | | |
| | | records, err := queryElasticsearch(db.GetEsClient(), m) |
| | | if err != nil { |
| | | log.Fatalf("Failed to query Elasticsearch: %v", err) |
| | |
| | | return nil |
| | | } |
| | | |
| | | aggregation, err := analyzeAndAggregate(records) |
| | | newRecords := make([]*GatherRecord, 0) |
| | | |
| | | // 聚合逻辑 |
| | | for _, record := range records { |
| | | // 生成唯一标识 |
| | | uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate) |
| | | |
| | | // 如果已经处理过,跳过 |
| | | if _, exists := processed[uniqueKey]; exists { |
| | | continue |
| | | } |
| | | |
| | | // 添加到已处理记录 |
| | | processed[uniqueKey] = ProcessedRecord{ |
| | | UniqueKey: uniqueKey, |
| | | Timestamp: time.Now(), |
| | | } |
| | | newRecords = append(newRecords, record) |
| | | } |
| | | |
| | | aggregation, err := analyzeAndAggregate(newRecords) |
| | | if err != nil { |
| | | log.Fatalf("Failed to analyze and aggregate data: %v", err) |
| | | } |
| | |
| | | return nil |
| | | } |
| | | |
| | | func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]GatherRecord, error) { |
| | | func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]*GatherRecord, error) { |
| | | var buf bytes.Buffer |
| | | now := time.Now() |
| | | start := now.Add(-time.Duration(gatherModel.DaysWindow) * 24 * time.Hour) |
| | |
| | | } |
| | | |
| | | // 解析聚合结果 |
| | | var records []GatherRecord |
| | | var records []*GatherRecord |
| | | if aggs, ok := result["aggregations"].(map[string]interface{}); ok { |
| | | if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok { |
| | | for _, orgBucket := range orgBuckets { |
| | |
| | | documentNumber := person.(map[string]interface{})["key"].(string) |
| | | |
| | | // 构建 GatherRecord 结构体 |
| | | record := GatherRecord{ |
| | | record := &GatherRecord{ |
| | | PicDate: timestamp, |
| | | DocumentNumber: documentNumber, |
| | | CommunityId: communityId, |
| | |
| | | Time string |
| | | } |
| | | |
| | | func analyzeAndAggregate(records []GatherRecord) (map[GatherLocationTime]set.StringSet, error) { |
| | | func analyzeAndAggregate(records []*GatherRecord) (map[GatherLocationTime]set.StringSet, error) { |
| | | aggregation := make(map[GatherLocationTime]set.StringSet) |
| | | domainIds := set.NewStringSet() |
| | | for _, record := range records { |