From 6b59a711b9af0825858c408cdba95102b8b51cb3 Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期五, 13 十二月 2024 15:46:44 +0800
Subject: [PATCH] 结果表增加联合唯一索引,防止重复报警

---
 models/gather_model.go |  272 ++++++++++++++++++++++++++++++++++++++++-------------
 1 files changed, 203 insertions(+), 69 deletions(-)

diff --git a/models/gather_model.go b/models/gather_model.go
index 66864cf..59187ce 100644
--- a/models/gather_model.go
+++ b/models/gather_model.go
@@ -4,11 +4,13 @@
 	"bytes"
 	"context"
 	"encoding/json"
+	"errors"
 	"fmt"
 	"github.com/elastic/go-elasticsearch/v6"
 	"log"
 	"model-engine/config"
 	"model-engine/db"
+	"model-engine/pkg/set"
 	"model-engine/service"
 	"strings"
 	"time"
@@ -25,15 +27,24 @@
 	AppearInterval int           `gorm:"type:int;" json:"appearInterval"`      //鍑虹幇闂撮殧锛屽崟浣嶄负绉�
 	DaysWindow     int           `gorm:"type:int;" json:"daysWindow" `         //杩戝嚑澶╁唴
 	Threshold      int           `gorm:"type:int;" json:"threshold" `          //杈惧嚑娆�
+	Task           *db.ModelTask
+}
+
+type ProcessedRecord struct {
+	UniqueKey string    // 鍞竴鏍囪瘑
+	Timestamp time.Time // 璁板綍鐨勬椂闂存埑
 }
 
 func (m *GatherModel) Init(task *db.ModelTask) error {
-
+	if len(task.DomainUnitIds) == 0 {
+		return errors.New("empty domain set")
+	}
 	orgIds, areaIds, err := service.GetOrgIdsAndAreaIdsByDomainUnitIds(task.DomainUnitIds)
 	if err != nil {
 		return err
 	}
 
+	m.Task = task
 	m.OrgIds = orgIds
 	m.AreaIds = areaIds
 	m.Building = task.Building
@@ -53,13 +64,29 @@
 	PicDate        string `json:"picDate"`
 	DocumentNumber string
 	CommunityId    string `json:"communityId"`
+	OrgId          string `json:"orgId"`
 	Building       string `json:"building"`
 	Floor          string `json:"floor"`
 	GatherPersons  int    `gorm:"type:int;" json:"gatherPersons"`  //鑱氶泦浜烘暟
 	AppearInterval int    `gorm:"type:int;" json:"appearInterval"` //鍑虹幇闂撮殧锛屽崟浣嶄负绉�
 }
 
+var (
+	processed        map[string]ProcessedRecord         // 瀛樺偍宸插鐞嗚褰�
+	cleanupThreshold = time.Now().Add(-100 * time.Hour) // 瀹氫箟涓�涓椂闂寸獥鍙o紝鍋囪鍙繚瀛樻渶杩�100灏忔椂鐨勮褰�
+)
+
+func init() {
+	processed = make(map[string]ProcessedRecord)
+}
 func (m *GatherModel) Run() error {
+	// 娓呯悊杩囨湡鐨勮褰�
+	for key, record := range processed {
+		if record.Timestamp.Before(cleanupThreshold) {
+			delete(processed, key)
+		}
+	}
+
 	records, err := queryElasticsearch(db.GetEsClient(), m)
 	if err != nil {
 		log.Fatalf("Failed to query Elasticsearch: %v", err)
@@ -69,16 +96,64 @@
 		return nil
 	}
 
-	aggregation, err := analyzeAndAggregate(records)
+	newRecords := make([]*GatherRecord, 0)
+
+	// 鑱氬悎閫昏緫
+	for _, record := range records {
+		// 鐢熸垚鍞竴鏍囪瘑
+		uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate)
+
+		// 濡傛灉宸茬粡澶勭悊杩囷紝璺宠繃
+		if _, exists := processed[uniqueKey]; exists {
+			continue
+		}
+
+		// 娣诲姞鍒板凡澶勭悊璁板綍
+		processed[uniqueKey] = ProcessedRecord{
+			UniqueKey: uniqueKey,
+			Timestamp: time.Now(),
+		}
+		newRecords = append(newRecords, record)
+	}
+
+	aggregation, err := analyzeAndAggregate(newRecords)
 	if err != nil {
 		log.Fatalf("Failed to analyze and aggregate data: %v", err)
 	}
 
-	// Print or process the aggregation results as needed
-	for location, persons := range aggregation {
-		fmt.Printf("Gathering detected at %s with %d unique persons\n", location, len(persons))
+	if len(aggregation) == 0 {
+		return nil
 	}
-	return nil
+
+	tagTypes := strings.Split(m.Task.PersonType, ",")
+	results := make([]*db.ModelTaskResults, 0, len(aggregation))
+	_, typeNames, err := service.GetPersonTypeNameByTypes(tagTypes)
+	if err != nil {
+		return err
+	}
+	event := strings.Join(typeNames, ",")
+	for lt, persons := range aggregation {
+		if persons.Size() == 0 {
+			continue
+		}
+		personIds := persons.Elements()
+		result := &db.ModelTaskResults{
+			Title:         m.Task.Name,
+			Event:         fmt.Sprintf("%s/%d浜�", event, len(persons)),
+			ModelID:       m.Task.ModelID,
+			ModelTaskID:   m.Task.ID,
+			CommunityId:   lt.CommunityId,
+			OrgID:         lt.OrgId,
+			ObjectIds:     strings.Join(personIds, ","),
+			Location:      lt.Location,
+			Building:      lt.Building,
+			Floor:         lt.Floor,
+			PicDate:       lt.Time,
+			FirstPersonID: personIds[0],
+		}
+		results = append(results, result)
+	}
+	return service.SaveTaskResults(results)
 }
 
 func (m *GatherModel) Shutdown() error {
@@ -87,7 +162,7 @@
 	return nil
 }
 
-func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]GatherRecord, error) {
+func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]*GatherRecord, error) {
 	var buf bytes.Buffer
 	now := time.Now()
 	start := now.Add(-time.Duration(gatherModel.DaysWindow) * 24 * time.Hour)
@@ -151,46 +226,67 @@
 			},
 		},
 		"aggs": map[string]interface{}{
-			"gather_events": map[string]interface{}{
-				"date_histogram": map[string]interface{}{
-					"field":         "picDate",
-					"interval":      fmt.Sprintf("%ds", gatherModel.AppearInterval),
-					"min_doc_count": 1,
+			"orgs": map[string]interface{}{ // 鍏堣仛鍚坥rgId
+				"terms": map[string]interface{}{
+					"field": "orgId", // 鑱氬悎orgId
+					"size":  10000,
 				},
 				"aggs": map[string]interface{}{
-					"community": map[string]interface{}{
+					"community": map[string]interface{}{ // 鍦╫rgId鑱氬悎涓嬭仛鍚坈ommunityId
 						"terms": map[string]interface{}{
-							"field": "communityId", // 鑱氬悎灏忓尯id
+							"field": "communityId", // 鑱氬悎communityId
 							"size":  10000,
 						},
 						"aggs": map[string]interface{}{
-							"location": map[string]interface{}{
+							"location": map[string]interface{}{ // 鍦╟ommunityId涓嬭仛鍚坆uilding
 								"terms": map[string]interface{}{
 									"field": "cameraLocation.building", // 鑱氬悎妤兼爧
 									"size":  10000,
 								},
 								"aggs": map[string]interface{}{
-									"floor": map[string]interface{}{
+									"floor": map[string]interface{}{ // 鍦╞uilding涓嬭仛鍚坒loor
 										"terms": map[string]interface{}{
 											"field": "cameraLocation.floor", // 鑱氬悎妤煎眰
 											"size":  10000,
 										},
 										"aggs": map[string]interface{}{
-											"people": map[string]interface{}{
-												"terms": map[string]interface{}{
-													"field": "documentNumber", // 鎸変汉鍛樺敮涓�鏍囪瘑鑱氬悎
-													"size":  10000,
+											"gather_events": map[string]interface{}{ // 鍦╢loor涓嬭仛鍚坓ather_events
+												"date_histogram": map[string]interface{}{
+													"field":         "picDate",
+													"interval":      fmt.Sprintf("%ds", gatherModel.AppearInterval),
+													"min_doc_count": 1,
 												},
-											},
-											"filter_gather": map[string]interface{}{
-												"bucket_selector": map[string]interface{}{
-													"buckets_path": map[string]interface{}{
-														"personCount": "people._bucket_count", // 缁熻浜烘暟
+												"aggs": map[string]interface{}{
+													"people": map[string]interface{}{
+														"terms": map[string]interface{}{
+															"field": "documentNumber", // 鎸変汉鍛樺敮涓�鏍囪瘑鑱氬悎
+															"size":  10000,
+														},
 													},
-													"script": map[string]interface{}{
-														"source": "params.personCount >= params.gatherPersons", // 鑱氶泦浜烘暟杩囨护
-														"params": map[string]interface{}{
-															"gatherPersons": gatherModel.GatherPersons,
+													"filter_gather": map[string]interface{}{
+														"bucket_selector": map[string]interface{}{
+															"buckets_path": map[string]interface{}{
+																"personCount": "people._bucket_count", // 缁熻浜烘暟
+															},
+															"script": map[string]interface{}{
+																"source": "params.personCount >= params.gatherPersons", // 鑱氶泦浜烘暟杩囨护
+																"params": map[string]interface{}{
+																	"gatherPersons": gatherModel.GatherPersons,
+																},
+															},
+														},
+													},
+													"frequency_filter": map[string]interface{}{ // 娣诲姞棰戠巼杩囨护
+														"bucket_selector": map[string]interface{}{
+															"buckets_path": map[string]interface{}{
+																"eventCount": "_count", // 鑱氬悎浜嬩欢娆℃暟
+															},
+															"script": map[string]interface{}{
+																"source": "params.eventCount >= params.threshold", // 绛涢�夐鐜囪揪鍒伴槇鍊肩殑浜嬩欢
+																"params": map[string]interface{}{
+																	"threshold": gatherModel.Threshold,
+																},
+															},
 														},
 													},
 												},
@@ -235,46 +331,52 @@
 	}
 
 	// 瑙f瀽鑱氬悎缁撴灉
-	var records []GatherRecord
+	var records []*GatherRecord
 	if aggs, ok := result["aggregations"].(map[string]interface{}); ok {
-		if gatherEvents, ok := aggs["gather_events"].(map[string]interface{}); ok {
-			if buckets, ok := gatherEvents["buckets"].([]interface{}); ok {
-				for _, bucket := range buckets {
-					key := int64(bucket.(map[string]interface{})["key"].(float64)) / 1000 // 灏嗘绉掕浆鎹负绉�
-					timestamp := time.Unix(key, 0).Format("2006-01-02T15:04:05")
+		if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok {
+			for _, orgBucket := range orgBuckets {
+				orgId := orgBucket.(map[string]interface{})["key"].(string)
 
-					// 瑙f瀽鎸夊皬鍖恒�佹ゼ鏍嬪拰妤煎眰鐨勮仛鍚堢粨鏋�
-					if communityBuckets, ok := bucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok {
-						for _, communityBucket := range communityBuckets {
-							communityId := communityBucket.(map[string]interface{})["key"].(string)
+				// 瑙f瀽鎸塩ommunityId鐨勮仛鍚堢粨鏋�
+				if communityBuckets, ok := orgBucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok {
+					for _, communityBucket := range communityBuckets {
+						communityId := communityBucket.(map[string]interface{})["key"].(string)
 
-							// 瑙f瀽鎸夋ゼ鏍嬪拰妤煎眰鐨勮仛鍚堢粨鏋�
-							if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok {
-								for _, locationBucket := range locationBuckets {
-									building := locationBucket.(map[string]interface{})["key"].(string)
+						// 瑙f瀽鎸塨uilding鐨勮仛鍚堢粨鏋�
+						if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok {
+							for _, locationBucket := range locationBuckets {
+								building := locationBucket.(map[string]interface{})["key"].(string)
 
-									// 瑙f瀽妤煎眰
-									if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok {
-										for _, floorBucket := range floorBuckets {
-											floor := floorBucket.(map[string]interface{})["key"].(string)
+								// 瑙f瀽鎸塮loor鐨勮仛鍚堢粨鏋�
+								if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok {
+									for _, floorBucket := range floorBuckets {
+										floor := floorBucket.(map[string]interface{})["key"].(string)
 
-											// 瑙f瀽浜哄憳
-											if peopleBuckets, ok := floorBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok {
-												for _, person := range peopleBuckets {
-													documentNumber := person.(map[string]interface{})["key"].(string)
+										// 瑙f瀽鑱氬悎鐨勪簨浠�
+										if gatherEvents, ok := floorBucket.(map[string]interface{})["gather_events"].(map[string]interface{})["buckets"].([]interface{}); ok {
+											for _, eventBucket := range gatherEvents {
+												key := int64(eventBucket.(map[string]interface{})["key"].(float64)) / 1000 // 灏嗘绉掕浆鎹负绉�
+												timestamp := time.Unix(key, 0).UTC().Format("2006-01-02 15:04:05")
 
-													// 鏋勫缓 GatherRecord 缁撴瀯浣�
-													record := GatherRecord{
-														PicDate:        timestamp,
-														DocumentNumber: documentNumber,
-														CommunityId:    communityId,
-														Building:       building,
-														Floor:          floor,
-														AppearInterval: gatherModel.AppearInterval,
-														GatherPersons:  gatherModel.GatherPersons,
+												// 瑙f瀽浜哄憳
+												if peopleBuckets, ok := eventBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok {
+													for _, person := range peopleBuckets {
+														documentNumber := person.(map[string]interface{})["key"].(string)
+
+														// 鏋勫缓 GatherRecord 缁撴瀯浣�
+														record := &GatherRecord{
+															PicDate:        timestamp,
+															DocumentNumber: documentNumber,
+															CommunityId:    communityId,
+															Building:       building,
+															Floor:          floor,
+															OrgId:          orgId,
+															AppearInterval: gatherModel.AppearInterval,
+															GatherPersons:  gatherModel.GatherPersons,
+														}
+
+														records = append(records, record)
 													}
-
-													records = append(records, record)
 												}
 											}
 										}
@@ -291,15 +393,47 @@
 	return records, nil
 }
 
-func analyzeAndAggregate(records []GatherRecord) (map[string][]string, error) {
-	// Implement logic to aggregate and analyze data based on GatherModel parameters
-	// This is a placeholder for the actual implementation
-	aggregation := make(map[string][]string)
+type GatherLocationTime struct {
+	CommunityId string
+	OrgId       string
+	Building    string
+	Floor       string
+	Location    string
+	Time        string
+}
 
-	// Example logic:
+func analyzeAndAggregate(records []*GatherRecord) (map[GatherLocationTime]set.StringSet, error) {
+	aggregation := make(map[GatherLocationTime]set.StringSet)
+	domainIds := set.NewStringSet()
 	for _, record := range records {
-		key := fmt.Sprintf("%s%s%s", record.CommunityId, record.Building, record.Floor)
-		aggregation[key] = append(aggregation[key], record.DocumentNumber)
+		domainIds.Add(record.CommunityId)
+	}
+
+	domains, err := service.GetUnitsMapByIds(domainIds.Elements())
+	if err != nil {
+		return nil, err
+	}
+
+	for _, record := range records {
+		if record.DocumentNumber == "" {
+			continue
+		}
+		if domains[record.CommunityId] == nil {
+			continue
+		}
+
+		location := GatherLocationTime{
+			CommunityId: record.CommunityId,
+			OrgId:       record.OrgId,
+			Building:    record.Building,
+			Floor:       record.Floor,
+			Location:    fmt.Sprintf("%s%s%s", domains[record.CommunityId].Name, record.Building, record.Floor),
+			Time:        record.PicDate,
+		}
+		if aggregation[location] == nil {
+			aggregation[location] = set.NewStringSet()
+		}
+		aggregation[location].Add(record.DocumentNumber)
 	}
 
 	return aggregation, nil

--
Gitblit v1.8.0