| | |
| | | "model-engine/pkg/set" |
| | | "model-engine/service" |
| | | "strings" |
| | | "sync" |
| | | "time" |
| | | ) |
| | | |
| | |
| | | } |
| | | |
| | | var ( |
| | | processed map[string]ProcessedRecord // 存储已处理记录 |
| | | processed sync.Map // 存储已处理记录 |
| | | cleanupThreshold = time.Now().Add(-100 * time.Hour) // 定义一个时间窗口,假设只保存最近100小时的记录 |
| | | ) |
| | | |
| | | func init() { |
| | | processed = make(map[string]ProcessedRecord) |
| | | } |
| | | func (m *GatherModel) Run() error { |
| | | // 清理过期的记录 |
| | | for key, record := range processed { |
| | | if record.Timestamp.Before(cleanupThreshold) { |
| | | delete(processed, key) |
| | | processed.Range(func(key, value any) bool { |
| | | if value.(ProcessedRecord).Timestamp.Before(cleanupThreshold) { |
| | | processed.Delete(key) |
| | | } |
| | | } |
| | | return true |
| | | }) |
| | | |
| | | records, err := queryElasticsearch(db.GetEsClient(), m) |
| | | if err != nil { |
| | |
| | | uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate) |
| | | |
| | | // 如果已经处理过,跳过 |
| | | if _, exists := processed[uniqueKey]; exists { |
| | | if _, exists := processed.Load(uniqueKey); exists { |
| | | continue |
| | | } |
| | | |
| | | // 添加到已处理记录 |
| | | processed[uniqueKey] = ProcessedRecord{ |
| | | processed.Store(uniqueKey, ProcessedRecord{ |
| | | UniqueKey: uniqueKey, |
| | | Timestamp: time.Now(), |
| | | } |
| | | }) |
| | | newRecords = append(newRecords, record) |
| | | } |
| | | if len(newRecords) == 0 { |
| | | return nil |
| | | } |
| | | |
| | | aggregation, err := analyzeAndAggregate(newRecords) |
| | |
| | | } |
| | | event := strings.Join(typeNames, ",") |
| | | for lt, persons := range aggregation { |
| | | if persons.Size() == 0 { |
| | | continue |
| | | } |
| | | personIds := persons.Elements() |
| | | result := &db.ModelTaskResults{ |
| | | Title: m.Task.Name, |
| | | Event: fmt.Sprintf("%s/%d人", event, len(persons)), |
| | | ModelID: m.Task.ModelID, |
| | | ModelTaskID: m.Task.ID, |
| | | CommunityId: lt.CommunityId, |
| | | OrgID: lt.OrgId, |
| | | ObjectIds: strings.Join(persons.Elements(), ","), |
| | | Location: lt.Location, |
| | | Building: lt.Building, |
| | | Floor: lt.Floor, |
| | | PicDate: lt.Time, |
| | | Title: m.Task.Name, |
| | | Event: fmt.Sprintf("%s/%d人", event, len(persons)), |
| | | ModelID: m.Task.ModelID, |
| | | ModelTaskID: m.Task.ID, |
| | | CommunityId: lt.CommunityId, |
| | | OrgID: lt.OrgId, |
| | | ObjectIds: strings.Join(personIds, ","), |
| | | Location: lt.Location, |
| | | Building: lt.Building, |
| | | Floor: lt.Floor, |
| | | PicDate: lt.Time, |
| | | FirstPersonID: personIds[0], |
| | | } |
| | | results = append(results, result) |
| | | } |
| | |
| | | } |
| | | |
| | | func analyzeAndAggregate(records []*GatherRecord) (map[GatherLocationTime]set.StringSet, error) { |
| | | if len(records) == 0 { |
| | | return nil, nil |
| | | } |
| | | aggregation := make(map[GatherLocationTime]set.StringSet) |
| | | domainIds := set.NewStringSet() |
| | | for _, record := range records { |