From fca319958029fa924308e50cb61202d7d6ff5008 Mon Sep 17 00:00:00 2001 From: zhaoqingang <zhaoqg0118@163.com> Date: 星期三, 19 二月 2025 13:33:37 +0800 Subject: [PATCH] 暂停聚集模型 --- models/gather_model.go | 177 +++++++++++++++++++++++++++++++++++++++++++++------------- 1 files changed, 137 insertions(+), 40 deletions(-) diff --git a/models/gather_model.go b/models/gather_model.go index 909c01f..d0c2756 100644 --- a/models/gather_model.go +++ b/models/gather_model.go @@ -4,33 +4,45 @@ "bytes" "context" "encoding/json" + "errors" "fmt" - "github.com/elastic/go-elasticsearch/v6" "log" + "strings" + "sync" + "time" + + "github.com/elastic/go-elasticsearch/v6" + "model-engine/config" "model-engine/db" + "model-engine/pkg/logger" "model-engine/pkg/set" "model-engine/service" - "strings" - "time" ) type GatherModel struct { OrgIds []interface{} `json:"-"` AreaIds []interface{} `json:"-"` - Building string `gorm:"type:varchar(255)" json:"building"` //妤兼爧 - Floor string `gorm:"type:varchar(255)" json:"floor"` //妤煎眰 - AlarmType db.AlarmType `gorm:"type:varchar(255);" json:"alarmType"` //棰勮鏂瑰紡 - PersonType string `gorm:"type:varchar(255);" json:"personType"` //浜哄憳绫诲瀷 - GatherPersons int `gorm:"type:int;" json:"gatherPersons"` //鑱氶泦浜烘暟 - AppearInterval int `gorm:"type:int;" json:"appearInterval"` //鍑虹幇闂撮殧锛屽崟浣嶄负绉� - DaysWindow int `gorm:"type:int;" json:"daysWindow" ` //杩戝嚑澶╁唴 - Threshold int `gorm:"type:int;" json:"threshold" ` //杈惧嚑娆� + Building string `gorm:"type:varchar(255)" json:"building"` // 妤兼爧 + Floor string `gorm:"type:varchar(255)" json:"floor"` // 妤煎眰 + AlarmType db.AlarmType `gorm:"type:varchar(255);" json:"alarmType"` // 棰勮鏂瑰紡 + PersonType string `gorm:"type:varchar(255);" json:"personType"` // 浜哄憳绫诲瀷 + GatherPersons int `gorm:"type:int;" json:"gatherPersons"` // 鑱氶泦浜烘暟 + AppearInterval int `gorm:"type:int;" json:"appearInterval"` // 锛屽崟浣嶄负绉� + DaysWindow int `gorm:"type:int;" json:"daysWindow" ` // 杩戝嚑澶╁唴 + Threshold int `gorm:"type:int;" json:"threshold" ` // 杈惧嚑娆� Task *db.ModelTask } -func (m *GatherModel) Init(task *db.ModelTask) error { +type ProcessedRecord struct { + UniqueKey string // 鍞竴鏍囪瘑 + Timestamp time.Time // 璁板綍鐨勬椂闂存埑 +} +func (m *GatherModel) Init(task *db.ModelTask) error { + if len(task.DomainUnitIds) == 0 { + return errors.New("empty domain set") + } orgIds, areaIds, err := service.GetOrgIdsAndAreaIdsByDomainUnitIds(task.DomainUnitIds) if err != nil { return err @@ -43,11 +55,40 @@ m.Floor = task.Floor m.AlarmType = task.AlarmType m.PersonType = task.PersonType - m.GatherPersons = task.GatherPersons - m.AppearInterval = task.AppearInterval - m.DaysWindow = task.DaysWindow - m.Threshold = task.Threshold - fmt.Println("GatherModel init finish ...") + + for _, v := range task.Rules { + if v.Alias == "gatherPersons" { + if val, ok := v.Value.(float64); ok { + m.GatherPersons = int(val) + } + } + + if v.Alias == "appearInterval" { + if val, ok := v.Value.(float64); ok { + m.AppearInterval = int(val) + } + } + + if v.Alias == "daysWindow" { + if val, ok := v.Value.(float64); ok { + m.DaysWindow = int(val) + } + } + + if v.Alias == "threshold" { + if val, ok := v.Value.(float64); ok { + m.Threshold = int(val) + } + } + } + + logger.Debugf("GatherModel init finish ...task id:%s, name:%s, rule:%+v\n", task.ID, task.Name, m) + + if m.GatherPersons == 0 || m.AppearInterval == 0 || m.DaysWindow == 0 || m.Threshold == 0 { + logger.Warnf("invalid parameters. task id:%s, name:%s\n", task.ID, task.Name) + return errors.New("invalid parameters") + } + return nil } @@ -59,11 +100,24 @@ OrgId string `json:"orgId"` Building string `json:"building"` Floor string `json:"floor"` - GatherPersons int `gorm:"type:int;" json:"gatherPersons"` //鑱氶泦浜烘暟 - AppearInterval int `gorm:"type:int;" json:"appearInterval"` //鍑虹幇闂撮殧锛屽崟浣嶄负绉� + GatherPersons int `gorm:"type:int;" json:"gatherPersons"` // 鑱氶泦浜烘暟 + AppearInterval int `gorm:"type:int;" json:"appearInterval"` // 鍑虹幇闂撮殧锛屽崟浣嶄负绉� } +var ( + processed sync.Map // 瀛樺偍宸插鐞嗚褰� + cleanupThreshold = time.Now().Add(-100 * time.Hour) // 瀹氫箟涓�涓椂闂寸獥鍙o紝鍋囪鍙繚瀛樻渶杩�100灏忔椂鐨勮褰� +) + func (m *GatherModel) Run() error { + // 娓呯悊杩囨湡鐨勮褰� + processed.Range(func(key, value any) bool { + if value.(ProcessedRecord).Timestamp.Before(cleanupThreshold) { + processed.Delete(key) + } + return true + }) + records, err := queryElasticsearch(db.GetEsClient(), m) if err != nil { log.Fatalf("Failed to query Elasticsearch: %v", err) @@ -73,7 +127,30 @@ return nil } - aggregation, err := analyzeAndAggregate(records) + newRecords := make([]*GatherRecord, 0) + + // 鑱氬悎閫昏緫 + for _, record := range records { + // 鐢熸垚鍞竴鏍囪瘑 + uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate) + + // 濡傛灉宸茬粡澶勭悊杩囷紝璺宠繃 + if _, exists := processed.Load(uniqueKey); exists { + continue + } + + // 娣诲姞鍒板凡澶勭悊璁板綍 + processed.Store(uniqueKey, ProcessedRecord{ + UniqueKey: uniqueKey, + Timestamp: time.Now(), + }) + newRecords = append(newRecords, record) + } + if len(newRecords) == 0 { + return nil + } + + aggregation, err := analyzeAndAggregate(newRecords) if err != nil { log.Fatalf("Failed to analyze and aggregate data: %v", err) } @@ -89,22 +166,34 @@ return err } event := strings.Join(typeNames, ",") - for location, persons := range aggregation { - result := &db.ModelTaskResults{ - Title: m.Task.Name, - Event: fmt.Sprintf("%s/%d浜�", event, len(persons)), - ModelID: m.Task.ModelID, - ModelTaskID: m.Task.ID, - CommunityId: location.CommunityId, - OrgID: location.OrgId, - ObjectIds: strings.Join(persons.Elements(), ","), - Location: location.Location, - Building: location.Building, - Floor: location.Floor, + for lt, persons := range aggregation { + if persons.Size() == 0 { + continue } + personIds := persons.Elements() + result := &db.ModelTaskResults{ + Title: m.Task.Name, + Event: fmt.Sprintf("%s/%d浜�", event, len(persons)), + ModelID: m.Task.ModelID, + ModelTaskID: m.Task.ID, + CommunityId: lt.CommunityId, + OrgID: lt.OrgId, + ObjectIds: strings.Join(personIds, ","), + Location: lt.Location, + Building: lt.Building, + Floor: lt.Floor, + PicDate: lt.Time, + FirstPersonID: personIds[0], + } + results = append(results, result) } return service.SaveTaskResults(results) +} + +func (m *GatherModel) KeepAlive() error { + db.GetDB().Model(m.Task).Where("id = ?", m.Task.ID).Update("last_run_time", time.Now()) + return nil } func (m *GatherModel) Shutdown() error { @@ -113,7 +202,7 @@ return nil } -func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]GatherRecord, error) { +func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]*GatherRecord, error) { var buf bytes.Buffer now := time.Now() start := now.Add(-time.Duration(gatherModel.DaysWindow) * 24 * time.Hour) @@ -282,7 +371,7 @@ } // 瑙f瀽鑱氬悎缁撴灉 - var records []GatherRecord + var records []*GatherRecord if aggs, ok := result["aggregations"].(map[string]interface{}); ok { if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok { for _, orgBucket := range orgBuckets { @@ -307,7 +396,7 @@ if gatherEvents, ok := floorBucket.(map[string]interface{})["gather_events"].(map[string]interface{})["buckets"].([]interface{}); ok { for _, eventBucket := range gatherEvents { key := int64(eventBucket.(map[string]interface{})["key"].(float64)) / 1000 // 灏嗘绉掕浆鎹负绉� - timestamp := time.Unix(key, 0).Format("2006-01-02 15:04:05") + timestamp := time.Unix(key, 0).UTC().Format("2006-01-02 15:04:05") // 瑙f瀽浜哄憳 if peopleBuckets, ok := eventBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok { @@ -315,7 +404,7 @@ documentNumber := person.(map[string]interface{})["key"].(string) // 鏋勫缓 GatherRecord 缁撴瀯浣� - record := GatherRecord{ + record := &GatherRecord{ PicDate: timestamp, DocumentNumber: documentNumber, CommunityId: communityId, @@ -344,16 +433,20 @@ return records, nil } -type GatherLocation struct { +type GatherLocationTime struct { CommunityId string OrgId string Building string Floor string Location string + Time string } -func analyzeAndAggregate(records []GatherRecord) (map[GatherLocation]set.StringSet, error) { - aggregation := make(map[GatherLocation]set.StringSet) +func analyzeAndAggregate(records []*GatherRecord) (map[GatherLocationTime]set.StringSet, error) { + if len(records) == 0 { + return nil, nil + } + aggregation := make(map[GatherLocationTime]set.StringSet) domainIds := set.NewStringSet() for _, record := range records { domainIds.Add(record.CommunityId) @@ -365,16 +458,20 @@ } for _, record := range records { + if record.DocumentNumber == "" { + continue + } if domains[record.CommunityId] == nil { continue } - location := GatherLocation{ + location := GatherLocationTime{ CommunityId: record.CommunityId, OrgId: record.OrgId, Building: record.Building, Floor: record.Floor, Location: fmt.Sprintf("%s%s%s", domains[record.CommunityId].Name, record.Building, record.Floor), + Time: record.PicDate, } if aggregation[location] == nil { aggregation[location] = set.NewStringSet() -- Gitblit v1.8.0