From 6b59a711b9af0825858c408cdba95102b8b51cb3 Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期五, 13 十二月 2024 15:46:44 +0800 Subject: [PATCH] 结果表增加联合唯一索引,防止重复报警 --- models/gather_model.go | 272 ++++++++++++++++++++++++++++++++++++++++------------- 1 files changed, 203 insertions(+), 69 deletions(-) diff --git a/models/gather_model.go b/models/gather_model.go index 66864cf..59187ce 100644 --- a/models/gather_model.go +++ b/models/gather_model.go @@ -4,11 +4,13 @@ "bytes" "context" "encoding/json" + "errors" "fmt" "github.com/elastic/go-elasticsearch/v6" "log" "model-engine/config" "model-engine/db" + "model-engine/pkg/set" "model-engine/service" "strings" "time" @@ -25,15 +27,24 @@ AppearInterval int `gorm:"type:int;" json:"appearInterval"` //鍑虹幇闂撮殧锛屽崟浣嶄负绉� DaysWindow int `gorm:"type:int;" json:"daysWindow" ` //杩戝嚑澶╁唴 Threshold int `gorm:"type:int;" json:"threshold" ` //杈惧嚑娆� + Task *db.ModelTask +} + +type ProcessedRecord struct { + UniqueKey string // 鍞竴鏍囪瘑 + Timestamp time.Time // 璁板綍鐨勬椂闂存埑 } func (m *GatherModel) Init(task *db.ModelTask) error { - + if len(task.DomainUnitIds) == 0 { + return errors.New("empty domain set") + } orgIds, areaIds, err := service.GetOrgIdsAndAreaIdsByDomainUnitIds(task.DomainUnitIds) if err != nil { return err } + m.Task = task m.OrgIds = orgIds m.AreaIds = areaIds m.Building = task.Building @@ -53,13 +64,29 @@ PicDate string `json:"picDate"` DocumentNumber string CommunityId string `json:"communityId"` + OrgId string `json:"orgId"` Building string `json:"building"` Floor string `json:"floor"` GatherPersons int `gorm:"type:int;" json:"gatherPersons"` //鑱氶泦浜烘暟 AppearInterval int `gorm:"type:int;" json:"appearInterval"` //鍑虹幇闂撮殧锛屽崟浣嶄负绉� } +var ( + processed map[string]ProcessedRecord // 瀛樺偍宸插鐞嗚褰� + cleanupThreshold = time.Now().Add(-100 * time.Hour) // 瀹氫箟涓�涓椂闂寸獥鍙o紝鍋囪鍙繚瀛樻渶杩�100灏忔椂鐨勮褰� +) + +func init() { + processed = make(map[string]ProcessedRecord) +} func (m *GatherModel) Run() error { + // 娓呯悊杩囨湡鐨勮褰� + for key, record := range processed { + if record.Timestamp.Before(cleanupThreshold) { + delete(processed, key) + } + } + records, err := queryElasticsearch(db.GetEsClient(), m) if err != nil { log.Fatalf("Failed to query Elasticsearch: %v", err) @@ -69,16 +96,64 @@ return nil } - aggregation, err := analyzeAndAggregate(records) + newRecords := make([]*GatherRecord, 0) + + // 鑱氬悎閫昏緫 + for _, record := range records { + // 鐢熸垚鍞竴鏍囪瘑 + uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate) + + // 濡傛灉宸茬粡澶勭悊杩囷紝璺宠繃 + if _, exists := processed[uniqueKey]; exists { + continue + } + + // 娣诲姞鍒板凡澶勭悊璁板綍 + processed[uniqueKey] = ProcessedRecord{ + UniqueKey: uniqueKey, + Timestamp: time.Now(), + } + newRecords = append(newRecords, record) + } + + aggregation, err := analyzeAndAggregate(newRecords) if err != nil { log.Fatalf("Failed to analyze and aggregate data: %v", err) } - // Print or process the aggregation results as needed - for location, persons := range aggregation { - fmt.Printf("Gathering detected at %s with %d unique persons\n", location, len(persons)) + if len(aggregation) == 0 { + return nil } - return nil + + tagTypes := strings.Split(m.Task.PersonType, ",") + results := make([]*db.ModelTaskResults, 0, len(aggregation)) + _, typeNames, err := service.GetPersonTypeNameByTypes(tagTypes) + if err != nil { + return err + } + event := strings.Join(typeNames, ",") + for lt, persons := range aggregation { + if persons.Size() == 0 { + continue + } + personIds := persons.Elements() + result := &db.ModelTaskResults{ + Title: m.Task.Name, + Event: fmt.Sprintf("%s/%d浜�", event, len(persons)), + ModelID: m.Task.ModelID, + ModelTaskID: m.Task.ID, + CommunityId: lt.CommunityId, + OrgID: lt.OrgId, + ObjectIds: strings.Join(personIds, ","), + Location: lt.Location, + Building: lt.Building, + Floor: lt.Floor, + PicDate: lt.Time, + FirstPersonID: personIds[0], + } + results = append(results, result) + } + return service.SaveTaskResults(results) } func (m *GatherModel) Shutdown() error { @@ -87,7 +162,7 @@ return nil } -func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]GatherRecord, error) { +func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]*GatherRecord, error) { var buf bytes.Buffer now := time.Now() start := now.Add(-time.Duration(gatherModel.DaysWindow) * 24 * time.Hour) @@ -151,46 +226,67 @@ }, }, "aggs": map[string]interface{}{ - "gather_events": map[string]interface{}{ - "date_histogram": map[string]interface{}{ - "field": "picDate", - "interval": fmt.Sprintf("%ds", gatherModel.AppearInterval), - "min_doc_count": 1, + "orgs": map[string]interface{}{ // 鍏堣仛鍚坥rgId + "terms": map[string]interface{}{ + "field": "orgId", // 鑱氬悎orgId + "size": 10000, }, "aggs": map[string]interface{}{ - "community": map[string]interface{}{ + "community": map[string]interface{}{ // 鍦╫rgId鑱氬悎涓嬭仛鍚坈ommunityId "terms": map[string]interface{}{ - "field": "communityId", // 鑱氬悎灏忓尯id + "field": "communityId", // 鑱氬悎communityId "size": 10000, }, "aggs": map[string]interface{}{ - "location": map[string]interface{}{ + "location": map[string]interface{}{ // 鍦╟ommunityId涓嬭仛鍚坆uilding "terms": map[string]interface{}{ "field": "cameraLocation.building", // 鑱氬悎妤兼爧 "size": 10000, }, "aggs": map[string]interface{}{ - "floor": map[string]interface{}{ + "floor": map[string]interface{}{ // 鍦╞uilding涓嬭仛鍚坒loor "terms": map[string]interface{}{ "field": "cameraLocation.floor", // 鑱氬悎妤煎眰 "size": 10000, }, "aggs": map[string]interface{}{ - "people": map[string]interface{}{ - "terms": map[string]interface{}{ - "field": "documentNumber", // 鎸変汉鍛樺敮涓�鏍囪瘑鑱氬悎 - "size": 10000, + "gather_events": map[string]interface{}{ // 鍦╢loor涓嬭仛鍚坓ather_events + "date_histogram": map[string]interface{}{ + "field": "picDate", + "interval": fmt.Sprintf("%ds", gatherModel.AppearInterval), + "min_doc_count": 1, }, - }, - "filter_gather": map[string]interface{}{ - "bucket_selector": map[string]interface{}{ - "buckets_path": map[string]interface{}{ - "personCount": "people._bucket_count", // 缁熻浜烘暟 + "aggs": map[string]interface{}{ + "people": map[string]interface{}{ + "terms": map[string]interface{}{ + "field": "documentNumber", // 鎸変汉鍛樺敮涓�鏍囪瘑鑱氬悎 + "size": 10000, + }, }, - "script": map[string]interface{}{ - "source": "params.personCount >= params.gatherPersons", // 鑱氶泦浜烘暟杩囨护 - "params": map[string]interface{}{ - "gatherPersons": gatherModel.GatherPersons, + "filter_gather": map[string]interface{}{ + "bucket_selector": map[string]interface{}{ + "buckets_path": map[string]interface{}{ + "personCount": "people._bucket_count", // 缁熻浜烘暟 + }, + "script": map[string]interface{}{ + "source": "params.personCount >= params.gatherPersons", // 鑱氶泦浜烘暟杩囨护 + "params": map[string]interface{}{ + "gatherPersons": gatherModel.GatherPersons, + }, + }, + }, + }, + "frequency_filter": map[string]interface{}{ // 娣诲姞棰戠巼杩囨护 + "bucket_selector": map[string]interface{}{ + "buckets_path": map[string]interface{}{ + "eventCount": "_count", // 鑱氬悎浜嬩欢娆℃暟 + }, + "script": map[string]interface{}{ + "source": "params.eventCount >= params.threshold", // 绛涢�夐鐜囪揪鍒伴槇鍊肩殑浜嬩欢 + "params": map[string]interface{}{ + "threshold": gatherModel.Threshold, + }, + }, }, }, }, @@ -235,46 +331,52 @@ } // 瑙f瀽鑱氬悎缁撴灉 - var records []GatherRecord + var records []*GatherRecord if aggs, ok := result["aggregations"].(map[string]interface{}); ok { - if gatherEvents, ok := aggs["gather_events"].(map[string]interface{}); ok { - if buckets, ok := gatherEvents["buckets"].([]interface{}); ok { - for _, bucket := range buckets { - key := int64(bucket.(map[string]interface{})["key"].(float64)) / 1000 // 灏嗘绉掕浆鎹负绉� - timestamp := time.Unix(key, 0).Format("2006-01-02T15:04:05") + if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok { + for _, orgBucket := range orgBuckets { + orgId := orgBucket.(map[string]interface{})["key"].(string) - // 瑙f瀽鎸夊皬鍖恒�佹ゼ鏍嬪拰妤煎眰鐨勮仛鍚堢粨鏋� - if communityBuckets, ok := bucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok { - for _, communityBucket := range communityBuckets { - communityId := communityBucket.(map[string]interface{})["key"].(string) + // 瑙f瀽鎸塩ommunityId鐨勮仛鍚堢粨鏋� + if communityBuckets, ok := orgBucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok { + for _, communityBucket := range communityBuckets { + communityId := communityBucket.(map[string]interface{})["key"].(string) - // 瑙f瀽鎸夋ゼ鏍嬪拰妤煎眰鐨勮仛鍚堢粨鏋� - if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok { - for _, locationBucket := range locationBuckets { - building := locationBucket.(map[string]interface{})["key"].(string) + // 瑙f瀽鎸塨uilding鐨勮仛鍚堢粨鏋� + if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok { + for _, locationBucket := range locationBuckets { + building := locationBucket.(map[string]interface{})["key"].(string) - // 瑙f瀽妤煎眰 - if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok { - for _, floorBucket := range floorBuckets { - floor := floorBucket.(map[string]interface{})["key"].(string) + // 瑙f瀽鎸塮loor鐨勮仛鍚堢粨鏋� + if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok { + for _, floorBucket := range floorBuckets { + floor := floorBucket.(map[string]interface{})["key"].(string) - // 瑙f瀽浜哄憳 - if peopleBuckets, ok := floorBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok { - for _, person := range peopleBuckets { - documentNumber := person.(map[string]interface{})["key"].(string) + // 瑙f瀽鑱氬悎鐨勪簨浠� + if gatherEvents, ok := floorBucket.(map[string]interface{})["gather_events"].(map[string]interface{})["buckets"].([]interface{}); ok { + for _, eventBucket := range gatherEvents { + key := int64(eventBucket.(map[string]interface{})["key"].(float64)) / 1000 // 灏嗘绉掕浆鎹负绉� + timestamp := time.Unix(key, 0).UTC().Format("2006-01-02 15:04:05") - // 鏋勫缓 GatherRecord 缁撴瀯浣� - record := GatherRecord{ - PicDate: timestamp, - DocumentNumber: documentNumber, - CommunityId: communityId, - Building: building, - Floor: floor, - AppearInterval: gatherModel.AppearInterval, - GatherPersons: gatherModel.GatherPersons, + // 瑙f瀽浜哄憳 + if peopleBuckets, ok := eventBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok { + for _, person := range peopleBuckets { + documentNumber := person.(map[string]interface{})["key"].(string) + + // 鏋勫缓 GatherRecord 缁撴瀯浣� + record := &GatherRecord{ + PicDate: timestamp, + DocumentNumber: documentNumber, + CommunityId: communityId, + Building: building, + Floor: floor, + OrgId: orgId, + AppearInterval: gatherModel.AppearInterval, + GatherPersons: gatherModel.GatherPersons, + } + + records = append(records, record) } - - records = append(records, record) } } } @@ -291,15 +393,47 @@ return records, nil } -func analyzeAndAggregate(records []GatherRecord) (map[string][]string, error) { - // Implement logic to aggregate and analyze data based on GatherModel parameters - // This is a placeholder for the actual implementation - aggregation := make(map[string][]string) +type GatherLocationTime struct { + CommunityId string + OrgId string + Building string + Floor string + Location string + Time string +} - // Example logic: +func analyzeAndAggregate(records []*GatherRecord) (map[GatherLocationTime]set.StringSet, error) { + aggregation := make(map[GatherLocationTime]set.StringSet) + domainIds := set.NewStringSet() for _, record := range records { - key := fmt.Sprintf("%s%s%s", record.CommunityId, record.Building, record.Floor) - aggregation[key] = append(aggregation[key], record.DocumentNumber) + domainIds.Add(record.CommunityId) + } + + domains, err := service.GetUnitsMapByIds(domainIds.Elements()) + if err != nil { + return nil, err + } + + for _, record := range records { + if record.DocumentNumber == "" { + continue + } + if domains[record.CommunityId] == nil { + continue + } + + location := GatherLocationTime{ + CommunityId: record.CommunityId, + OrgId: record.OrgId, + Building: record.Building, + Floor: record.Floor, + Location: fmt.Sprintf("%s%s%s", domains[record.CommunityId].Name, record.Building, record.Floor), + Time: record.PicDate, + } + if aggregation[location] == nil { + aggregation[location] = set.NewStringSet() + } + aggregation[location].Add(record.DocumentNumber) } return aggregation, nil -- Gitblit v1.8.0