package models import ( "bytes" "context" "encoding/json" "fmt" "github.com/elastic/go-elasticsearch/v6" "log" "model-engine/config" "model-engine/db" "model-engine/pkg/set" "model-engine/service" "strings" "time" ) type GatherModel struct { OrgIds []interface{} `json:"-"` AreaIds []interface{} `json:"-"` Building string `gorm:"type:varchar(255)" json:"building"` //楼栋 Floor string `gorm:"type:varchar(255)" json:"floor"` //楼层 AlarmType db.AlarmType `gorm:"type:varchar(255);" json:"alarmType"` //预警方式 PersonType string `gorm:"type:varchar(255);" json:"personType"` //人员类型 GatherPersons int `gorm:"type:int;" json:"gatherPersons"` //聚集人数 AppearInterval int `gorm:"type:int;" json:"appearInterval"` //出现间隔,单位为秒 DaysWindow int `gorm:"type:int;" json:"daysWindow" ` //近几天内 Threshold int `gorm:"type:int;" json:"threshold" ` //达几次 Task *db.ModelTask } func (m *GatherModel) Init(task *db.ModelTask) error { orgIds, areaIds, err := service.GetOrgIdsAndAreaIdsByDomainUnitIds(task.DomainUnitIds) if err != nil { return err } m.Task = task m.OrgIds = orgIds m.AreaIds = areaIds m.Building = task.Building m.Floor = task.Floor m.AlarmType = task.AlarmType m.PersonType = task.PersonType m.GatherPersons = task.GatherPersons m.AppearInterval = task.AppearInterval m.DaysWindow = task.DaysWindow m.Threshold = task.Threshold fmt.Println("GatherModel init finish ...") return nil } type GatherRecord struct { IDCard string `json:"idCard"` PicDate string `json:"picDate"` DocumentNumber string CommunityId string `json:"communityId"` OrgId string `json:"orgId"` Building string `json:"building"` Floor string `json:"floor"` GatherPersons int `gorm:"type:int;" json:"gatherPersons"` //聚集人数 AppearInterval int `gorm:"type:int;" json:"appearInterval"` //出现间隔,单位为秒 } func (m *GatherModel) Run() error { records, err := queryElasticsearch(db.GetEsClient(), m) if err != nil { log.Fatalf("Failed to query Elasticsearch: %v", err) } if len(records) == 0 { return nil } aggregation, err := analyzeAndAggregate(records) if err != nil { log.Fatalf("Failed to analyze and aggregate data: %v", err) } if len(aggregation) == 0 { return nil } tagTypes := strings.Split(m.Task.PersonType, ",") results := make([]*db.ModelTaskResults, 0, len(aggregation)) _, typeNames, err := service.GetPersonTypeNameByTypes(tagTypes) if err != nil { return err } event := strings.Join(typeNames, ",") for location, persons := range aggregation { result := &db.ModelTaskResults{ Title: m.Task.Name, Event: fmt.Sprintf("%s/%d人", event, len(persons)), ModelID: m.Task.ModelID, ModelTaskID: m.Task.ID, CommunityId: location.CommunityId, OrgID: location.OrgId, ObjectIds: strings.Join(persons.Elements(), ","), Location: location.Location, Building: location.Building, Floor: location.Floor, } results = append(results, result) } return service.SaveTaskResults(results) } func (m *GatherModel) Shutdown() error { // 清理资源 fmt.Println("Shutting down GatherModel Model") return nil } func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]GatherRecord, error) { var buf bytes.Buffer now := time.Now() start := now.Add(-time.Duration(gatherModel.DaysWindow) * 24 * time.Hour) // 构建过滤条件 var filters []map[string]interface{} if len(gatherModel.OrgIds) > 0 || len(gatherModel.AreaIds) > 0 { // 获取数据权限过滤条件 authFilters := GetDomainFilters(gatherModel.OrgIds, gatherModel.AreaIds) filters = append(filters, authFilters...) } // 地址过滤 if gatherModel.Building != "" || gatherModel.Floor != "" { var addrParams map[string]interface{} if gatherModel.Floor != "" { addrParams = map[string]interface{}{"bool": map[string]interface{}{ "must": []interface{}{ map[string]interface{}{ "term": map[string]interface{}{ "cameraLocation.building": gatherModel.Building, }}, map[string]interface{}{ "term": map[string]interface{}{ "cameraLocation.floor": gatherModel.Floor, }}, }, }} } else if gatherModel.Building != "" { addrParams = map[string]interface{}{ "term": map[string]interface{}{ "cameraLocation.building": gatherModel.Building, }} } filters = append(filters, addrParams) } // 重点人员过滤 if len(gatherModel.PersonType) > 0 { filters = append(filters, map[string]interface{}{ "terms": map[string]interface{}{ "keyPersonType": strings.Split(gatherModel.PersonType, ","), }, }) } // 时间范围 filters = append(filters, map[string]interface{}{ "range": map[string]interface{}{ "picDate": map[string]interface{}{ "gte": start.Format(time.DateTime), "lt": now.Format(time.DateTime), }, }, }) query := map[string]interface{}{ "query": map[string]interface{}{ "bool": map[string]interface{}{ "filter": filters, }, }, "aggs": map[string]interface{}{ "orgs": map[string]interface{}{ // 先聚合orgId "terms": map[string]interface{}{ "field": "orgId", // 聚合orgId "size": 10000, }, "aggs": map[string]interface{}{ "community": map[string]interface{}{ // 在orgId聚合下聚合communityId "terms": map[string]interface{}{ "field": "communityId", // 聚合communityId "size": 10000, }, "aggs": map[string]interface{}{ "location": map[string]interface{}{ // 在communityId下聚合building "terms": map[string]interface{}{ "field": "cameraLocation.building", // 聚合楼栋 "size": 10000, }, "aggs": map[string]interface{}{ "floor": map[string]interface{}{ // 在building下聚合floor "terms": map[string]interface{}{ "field": "cameraLocation.floor", // 聚合楼层 "size": 10000, }, "aggs": map[string]interface{}{ "gather_events": map[string]interface{}{ // 在floor下聚合gather_events "date_histogram": map[string]interface{}{ "field": "picDate", "interval": fmt.Sprintf("%ds", gatherModel.AppearInterval), "min_doc_count": 1, }, "aggs": map[string]interface{}{ "people": map[string]interface{}{ "terms": map[string]interface{}{ "field": "documentNumber", // 按人员唯一标识聚合 "size": 10000, }, }, "filter_gather": map[string]interface{}{ "bucket_selector": map[string]interface{}{ "buckets_path": map[string]interface{}{ "personCount": "people._bucket_count", // 统计人数 }, "script": map[string]interface{}{ "source": "params.personCount >= params.gatherPersons", // 聚集人数过滤 "params": map[string]interface{}{ "gatherPersons": gatherModel.GatherPersons, }, }, }, }, "frequency_filter": map[string]interface{}{ // 添加频率过滤 "bucket_selector": map[string]interface{}{ "buckets_path": map[string]interface{}{ "eventCount": "_count", // 聚合事件次数 }, "script": map[string]interface{}{ "source": "params.eventCount >= params.threshold", // 筛选频率达到阈值的事件 "params": map[string]interface{}{ "threshold": gatherModel.Threshold, }, }, }, }, }, }, }, }, }, }, }, }, }, }, }, "size": 0, } if err := json.NewEncoder(&buf).Encode(query); err != nil { return nil, fmt.Errorf("error encoding query: %s", err) } res, err := esClient.Search( esClient.Search.WithContext(context.Background()), esClient.Search.WithIndex(config.EsInfo.EsIndex.AiOcean.IndexName), esClient.Search.WithDocumentType(config.EsInfo.EsIndex.AiOcean.IndexType), esClient.Search.WithBody(&buf), esClient.Search.WithTrackTotalHits(true), esClient.Search.WithPretty(), ) if err != nil { return nil, fmt.Errorf("error getting response: %s", err) } defer res.Body.Close() // Check for a successful status code (2xx range) if res.IsError() { return nil, fmt.Errorf("error getting response: %s", res.String()) } var result map[string]interface{} if err := json.NewDecoder(res.Body).Decode(&result); err != nil { return nil, fmt.Errorf("error parsing response body: %s", err) } // 解析聚合结果 var records []GatherRecord if aggs, ok := result["aggregations"].(map[string]interface{}); ok { if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok { for _, orgBucket := range orgBuckets { orgId := orgBucket.(map[string]interface{})["key"].(string) // 解析按communityId的聚合结果 if communityBuckets, ok := orgBucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok { for _, communityBucket := range communityBuckets { communityId := communityBucket.(map[string]interface{})["key"].(string) // 解析按building的聚合结果 if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok { for _, locationBucket := range locationBuckets { building := locationBucket.(map[string]interface{})["key"].(string) // 解析按floor的聚合结果 if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok { for _, floorBucket := range floorBuckets { floor := floorBucket.(map[string]interface{})["key"].(string) // 解析聚合的事件 if gatherEvents, ok := floorBucket.(map[string]interface{})["gather_events"].(map[string]interface{})["buckets"].([]interface{}); ok { for _, eventBucket := range gatherEvents { key := int64(eventBucket.(map[string]interface{})["key"].(float64)) / 1000 // 将毫秒转换为秒 timestamp := time.Unix(key, 0).Format("2006-01-02 15:04:05") // 解析人员 if peopleBuckets, ok := eventBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok { for _, person := range peopleBuckets { documentNumber := person.(map[string]interface{})["key"].(string) // 构建 GatherRecord 结构体 record := GatherRecord{ PicDate: timestamp, DocumentNumber: documentNumber, CommunityId: communityId, Building: building, Floor: floor, OrgId: orgId, AppearInterval: gatherModel.AppearInterval, GatherPersons: gatherModel.GatherPersons, } records = append(records, record) } } } } } } } } } } } } } return records, nil } type GatherLocation struct { CommunityId string OrgId string Building string Floor string Location string } func analyzeAndAggregate(records []GatherRecord) (map[GatherLocation]set.StringSet, error) { aggregation := make(map[GatherLocation]set.StringSet) domainIds := set.NewStringSet() for _, record := range records { domainIds.Add(record.CommunityId) } domains, err := service.GetUnitsMapByIds(domainIds.Elements()) if err != nil { return nil, err } for _, record := range records { if domains[record.CommunityId] == nil { continue } location := GatherLocation{ CommunityId: record.CommunityId, OrgId: record.OrgId, Building: record.Building, Floor: record.Floor, Location: fmt.Sprintf("%s%s%s", domains[record.CommunityId].Name, record.Building, record.Floor), } if aggregation[location] == nil { aggregation[location] = set.NewStringSet() } aggregation[location].Add(record.DocumentNumber) } return aggregation, nil }