From 973c3b1684d0d019aa74113b1983fe29f20d2a49 Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期四, 12 十二月 2024 17:03:40 +0800 Subject: [PATCH] 模型任务执行预警结果存表 --- models/gather_model.go | 195 ++++++++++++++++++++++++++++++++---------------- 1 files changed, 130 insertions(+), 65 deletions(-) diff --git a/models/gather_model.go b/models/gather_model.go index 66864cf..54e7626 100644 --- a/models/gather_model.go +++ b/models/gather_model.go @@ -9,6 +9,7 @@ "log" "model-engine/config" "model-engine/db" + "model-engine/pkg/set" "model-engine/service" "strings" "time" @@ -25,6 +26,7 @@ AppearInterval int `gorm:"type:int;" json:"appearInterval"` //鍑虹幇闂撮殧锛屽崟浣嶄负绉� DaysWindow int `gorm:"type:int;" json:"daysWindow" ` //杩戝嚑澶╁唴 Threshold int `gorm:"type:int;" json:"threshold" ` //杈惧嚑娆� + Task *db.ModelTask } func (m *GatherModel) Init(task *db.ModelTask) error { @@ -34,6 +36,7 @@ return err } + m.Task = task m.OrgIds = orgIds m.AreaIds = areaIds m.Building = task.Building @@ -53,6 +56,7 @@ PicDate string `json:"picDate"` DocumentNumber string CommunityId string `json:"communityId"` + OrgId string `json:"orgId"` Building string `json:"building"` Floor string `json:"floor"` GatherPersons int `gorm:"type:int;" json:"gatherPersons"` //鑱氶泦浜烘暟 @@ -74,11 +78,31 @@ log.Fatalf("Failed to analyze and aggregate data: %v", err) } - // Print or process the aggregation results as needed - for location, persons := range aggregation { - fmt.Printf("Gathering detected at %s with %d unique persons\n", location, len(persons)) + if len(aggregation) == 0 { + return nil } - return nil + + tagTypes := strings.Split(m.Task.PersonType, ",") + results := make([]*db.ModelTaskResults, 0, len(aggregation)) + _, typeNames, err := service.GetPersonTypeNameByTypes(tagTypes) + if err != nil { + return err + } + event := strings.Join(typeNames, ",") + for location, persons := range aggregation { + result := &db.ModelTaskResults{ + Title: m.Task.Name, + Event: fmt.Sprintf("%s/%d浜�", event, len(persons)), + ModelID: m.Task.ModelID, + ModelTaskID: m.Task.ID, + CommunityId: location.CommunityId, + OrgID: location.OrgId, + ObjectIds: strings.Join(persons.Elements(), ","), + Location: location.Location, + } + results = append(results, result) + } + return service.SaveTaskResults(results) } func (m *GatherModel) Shutdown() error { @@ -151,46 +175,54 @@ }, }, "aggs": map[string]interface{}{ - "gather_events": map[string]interface{}{ - "date_histogram": map[string]interface{}{ - "field": "picDate", - "interval": fmt.Sprintf("%ds", gatherModel.AppearInterval), - "min_doc_count": 1, + "orgs": map[string]interface{}{ // 鍏堣仛鍚坥rgId + "terms": map[string]interface{}{ + "field": "orgId", // 鑱氬悎orgId + "size": 10000, }, "aggs": map[string]interface{}{ - "community": map[string]interface{}{ + "community": map[string]interface{}{ // 鍦╫rgId鑱氬悎涓嬭仛鍚坈ommunityId "terms": map[string]interface{}{ - "field": "communityId", // 鑱氬悎灏忓尯id + "field": "communityId", // 鑱氬悎communityId "size": 10000, }, "aggs": map[string]interface{}{ - "location": map[string]interface{}{ + "location": map[string]interface{}{ // 鍦╟ommunityId涓嬭仛鍚坆uilding "terms": map[string]interface{}{ "field": "cameraLocation.building", // 鑱氬悎妤兼爧 "size": 10000, }, "aggs": map[string]interface{}{ - "floor": map[string]interface{}{ + "floor": map[string]interface{}{ // 鍦╞uilding涓嬭仛鍚坒loor "terms": map[string]interface{}{ "field": "cameraLocation.floor", // 鑱氬悎妤煎眰 "size": 10000, }, "aggs": map[string]interface{}{ - "people": map[string]interface{}{ - "terms": map[string]interface{}{ - "field": "documentNumber", // 鎸変汉鍛樺敮涓�鏍囪瘑鑱氬悎 - "size": 10000, + "gather_events": map[string]interface{}{ // 鍦╢loor涓嬭仛鍚坓ather_events + "date_histogram": map[string]interface{}{ + "field": "picDate", + "interval": fmt.Sprintf("%ds", gatherModel.AppearInterval), + "min_doc_count": 1, }, - }, - "filter_gather": map[string]interface{}{ - "bucket_selector": map[string]interface{}{ - "buckets_path": map[string]interface{}{ - "personCount": "people._bucket_count", // 缁熻浜烘暟 + "aggs": map[string]interface{}{ + "people": map[string]interface{}{ + "terms": map[string]interface{}{ + "field": "documentNumber", // 鎸変汉鍛樺敮涓�鏍囪瘑鑱氬悎 + "size": 10000, + }, }, - "script": map[string]interface{}{ - "source": "params.personCount >= params.gatherPersons", // 鑱氶泦浜烘暟杩囨护 - "params": map[string]interface{}{ - "gatherPersons": gatherModel.GatherPersons, + "filter_gather": map[string]interface{}{ + "bucket_selector": map[string]interface{}{ + "buckets_path": map[string]interface{}{ + "personCount": "people._bucket_count", // 缁熻浜烘暟 + }, + "script": map[string]interface{}{ + "source": "params.personCount >= params.gatherPersons", // 鑱氶泦浜烘暟杩囨护 + "params": map[string]interface{}{ + "gatherPersons": gatherModel.GatherPersons, + }, + }, }, }, }, @@ -237,44 +269,50 @@ // 瑙f瀽鑱氬悎缁撴灉 var records []GatherRecord if aggs, ok := result["aggregations"].(map[string]interface{}); ok { - if gatherEvents, ok := aggs["gather_events"].(map[string]interface{}); ok { - if buckets, ok := gatherEvents["buckets"].([]interface{}); ok { - for _, bucket := range buckets { - key := int64(bucket.(map[string]interface{})["key"].(float64)) / 1000 // 灏嗘绉掕浆鎹负绉� - timestamp := time.Unix(key, 0).Format("2006-01-02T15:04:05") + if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok { + for _, orgBucket := range orgBuckets { + orgId := orgBucket.(map[string]interface{})["key"].(string) - // 瑙f瀽鎸夊皬鍖恒�佹ゼ鏍嬪拰妤煎眰鐨勮仛鍚堢粨鏋� - if communityBuckets, ok := bucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok { - for _, communityBucket := range communityBuckets { - communityId := communityBucket.(map[string]interface{})["key"].(string) + // 瑙f瀽鎸塩ommunityId鐨勮仛鍚堢粨鏋� + if communityBuckets, ok := orgBucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok { + for _, communityBucket := range communityBuckets { + communityId := communityBucket.(map[string]interface{})["key"].(string) - // 瑙f瀽鎸夋ゼ鏍嬪拰妤煎眰鐨勮仛鍚堢粨鏋� - if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok { - for _, locationBucket := range locationBuckets { - building := locationBucket.(map[string]interface{})["key"].(string) + // 瑙f瀽鎸塨uilding鐨勮仛鍚堢粨鏋� + if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok { + for _, locationBucket := range locationBuckets { + building := locationBucket.(map[string]interface{})["key"].(string) - // 瑙f瀽妤煎眰 - if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok { - for _, floorBucket := range floorBuckets { - floor := floorBucket.(map[string]interface{})["key"].(string) + // 瑙f瀽鎸塮loor鐨勮仛鍚堢粨鏋� + if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok { + for _, floorBucket := range floorBuckets { + floor := floorBucket.(map[string]interface{})["key"].(string) - // 瑙f瀽浜哄憳 - if peopleBuckets, ok := floorBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok { - for _, person := range peopleBuckets { - documentNumber := person.(map[string]interface{})["key"].(string) + // 瑙f瀽鑱氬悎鐨勪簨浠� + if gatherEvents, ok := floorBucket.(map[string]interface{})["gather_events"].(map[string]interface{})["buckets"].([]interface{}); ok { + for _, eventBucket := range gatherEvents { + key := int64(eventBucket.(map[string]interface{})["key"].(float64)) / 1000 // 灏嗘绉掕浆鎹负绉� + timestamp := time.Unix(key, 0).Format("2006-01-02T15:04:05") - // 鏋勫缓 GatherRecord 缁撴瀯浣� - record := GatherRecord{ - PicDate: timestamp, - DocumentNumber: documentNumber, - CommunityId: communityId, - Building: building, - Floor: floor, - AppearInterval: gatherModel.AppearInterval, - GatherPersons: gatherModel.GatherPersons, + // 瑙f瀽浜哄憳 + if peopleBuckets, ok := eventBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok { + for _, person := range peopleBuckets { + documentNumber := person.(map[string]interface{})["key"].(string) + + // 鏋勫缓 GatherRecord 缁撴瀯浣� + record := GatherRecord{ + PicDate: timestamp, + DocumentNumber: documentNumber, + CommunityId: communityId, + Building: building, + Floor: floor, + OrgId: orgId, + AppearInterval: gatherModel.AppearInterval, + GatherPersons: gatherModel.GatherPersons, + } + + records = append(records, record) } - - records = append(records, record) } } } @@ -291,15 +329,42 @@ return records, nil } -func analyzeAndAggregate(records []GatherRecord) (map[string][]string, error) { - // Implement logic to aggregate and analyze data based on GatherModel parameters - // This is a placeholder for the actual implementation - aggregation := make(map[string][]string) +type GatherLocation struct { + CommunityId string + OrgId string + Building string + Floor string + Location string +} - // Example logic: +func analyzeAndAggregate(records []GatherRecord) (map[GatherLocation]set.StringSet, error) { + aggregation := make(map[GatherLocation]set.StringSet) + domainIds := set.NewStringSet() for _, record := range records { - key := fmt.Sprintf("%s%s%s", record.CommunityId, record.Building, record.Floor) - aggregation[key] = append(aggregation[key], record.DocumentNumber) + domainIds.Add(record.CommunityId) + } + + domains, err := service.GetUnitsMapByIds(domainIds.Elements()) + if err != nil { + return nil, err + } + + for _, record := range records { + if domains[record.CommunityId] == nil { + continue + } + + location := GatherLocation{ + CommunityId: record.CommunityId, + OrgId: record.OrgId, + Building: record.Building, + Floor: record.Floor, + Location: fmt.Sprintf("%s%s%s", domains[record.CommunityId].Name, record.Building, record.Floor), + } + if aggregation[location] == nil { + aggregation[location] = set.NewStringSet() + } + aggregation[location].Add(record.DocumentNumber) } return aggregation, nil -- Gitblit v1.8.0