| | |
| | | return err |
| | | } |
| | | event := strings.Join(typeNames, ",") |
| | | for location, persons := range aggregation { |
| | | for lt, persons := range aggregation { |
| | | result := &db.ModelTaskResults{ |
| | | Title: m.Task.Name, |
| | | Event: fmt.Sprintf("%s/%d人", event, len(persons)), |
| | | ModelID: m.Task.ModelID, |
| | | ModelTaskID: m.Task.ID, |
| | | CommunityId: location.CommunityId, |
| | | OrgID: location.OrgId, |
| | | CommunityId: lt.CommunityId, |
| | | OrgID: lt.OrgId, |
| | | ObjectIds: strings.Join(persons.Elements(), ","), |
| | | Location: location.Location, |
| | | Building: location.Building, |
| | | Floor: location.Floor, |
| | | Location: lt.Location, |
| | | Building: lt.Building, |
| | | Floor: lt.Floor, |
| | | PicDate: lt.Time, |
| | | } |
| | | results = append(results, result) |
| | | } |
| | |
| | | }, |
| | | }, |
| | | }, |
| | | "frequency_filter": map[string]interface{}{ // 添加频率过滤 |
| | | "bucket_selector": map[string]interface{}{ |
| | | "buckets_path": map[string]interface{}{ |
| | | "eventCount": "_count", // 聚合事件次数 |
| | | }, |
| | | "script": map[string]interface{}{ |
| | | "source": "params.eventCount >= params.threshold", // 筛选频率达到阈值的事件 |
| | | "params": map[string]interface{}{ |
| | | "threshold": gatherModel.Threshold, |
| | | }, |
| | | }, |
| | | }, |
| | | }, |
| | | }, |
| | | }, |
| | | }, |
| | |
| | | if gatherEvents, ok := floorBucket.(map[string]interface{})["gather_events"].(map[string]interface{})["buckets"].([]interface{}); ok { |
| | | for _, eventBucket := range gatherEvents { |
| | | key := int64(eventBucket.(map[string]interface{})["key"].(float64)) / 1000 // 将毫秒转换为秒 |
| | | timestamp := time.Unix(key, 0).Format("2006-01-02T15:04:05") |
| | | timestamp := time.Unix(key, 0).Format("2006-01-02 15:04:05") |
| | | |
| | | // 解析人员 |
| | | if peopleBuckets, ok := eventBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok { |
| | |
| | | return records, nil |
| | | } |
| | | |
| | | type GatherLocation struct { |
| | | type GatherLocationTime struct { |
| | | CommunityId string |
| | | OrgId string |
| | | Building string |
| | | Floor string |
| | | Location string |
| | | Time string |
| | | } |
| | | |
| | | func analyzeAndAggregate(records []GatherRecord) (map[GatherLocation]set.StringSet, error) { |
| | | aggregation := make(map[GatherLocation]set.StringSet) |
| | | func analyzeAndAggregate(records []GatherRecord) (map[GatherLocationTime]set.StringSet, error) { |
| | | aggregation := make(map[GatherLocationTime]set.StringSet) |
| | | domainIds := set.NewStringSet() |
| | | for _, record := range records { |
| | | domainIds.Add(record.CommunityId) |
| | |
| | | continue |
| | | } |
| | | |
| | | location := GatherLocation{ |
| | | location := GatherLocationTime{ |
| | | CommunityId: record.CommunityId, |
| | | OrgId: record.OrgId, |
| | | Building: record.Building, |
| | | Floor: record.Floor, |
| | | Location: fmt.Sprintf("%s%s%s", domains[record.CommunityId].Name, record.Building, record.Floor), |
| | | Time: record.PicDate, |
| | | } |
| | | if aggregation[location] == nil { |
| | | aggregation[location] = set.NewStringSet() |