From 3c033759200ad7c02dd59521b1aebbbdc35b98fa Mon Sep 17 00:00:00 2001 From: zhaoqingang <zhaoqg0118@163.com> Date: 星期六, 08 二月 2025 16:18:50 +0800 Subject: [PATCH] 长场景人员分析模型 --- models/gather_model.go | 100 +++++++++++++++++++++++++++++++++++-------------- 1 files changed, 71 insertions(+), 29 deletions(-) diff --git a/models/gather_model.go b/models/gather_model.go index 59187ce..59fe8a4 100644 --- a/models/gather_model.go +++ b/models/gather_model.go @@ -6,27 +6,31 @@ "encoding/json" "errors" "fmt" - "github.com/elastic/go-elasticsearch/v6" "log" + "strings" + "sync" + "time" + + "github.com/elastic/go-elasticsearch/v6" + "model-engine/config" "model-engine/db" + "model-engine/pkg/logger" "model-engine/pkg/set" "model-engine/service" - "strings" - "time" ) type GatherModel struct { OrgIds []interface{} `json:"-"` AreaIds []interface{} `json:"-"` - Building string `gorm:"type:varchar(255)" json:"building"` //妤兼爧 - Floor string `gorm:"type:varchar(255)" json:"floor"` //妤煎眰 - AlarmType db.AlarmType `gorm:"type:varchar(255);" json:"alarmType"` //棰勮鏂瑰紡 - PersonType string `gorm:"type:varchar(255);" json:"personType"` //浜哄憳绫诲瀷 - GatherPersons int `gorm:"type:int;" json:"gatherPersons"` //鑱氶泦浜烘暟 - AppearInterval int `gorm:"type:int;" json:"appearInterval"` //鍑虹幇闂撮殧锛屽崟浣嶄负绉� - DaysWindow int `gorm:"type:int;" json:"daysWindow" ` //杩戝嚑澶╁唴 - Threshold int `gorm:"type:int;" json:"threshold" ` //杈惧嚑娆� + Building string `gorm:"type:varchar(255)" json:"building"` // 妤兼爧 + Floor string `gorm:"type:varchar(255)" json:"floor"` // 妤煎眰 + AlarmType db.AlarmType `gorm:"type:varchar(255);" json:"alarmType"` // 棰勮鏂瑰紡 + PersonType string `gorm:"type:varchar(255);" json:"personType"` // 浜哄憳绫诲瀷 + GatherPersons int `gorm:"type:int;" json:"gatherPersons"` // 鑱氶泦浜烘暟 + AppearInterval int `gorm:"type:int;" json:"appearInterval"` // 锛屽崟浣嶄负绉� + DaysWindow int `gorm:"type:int;" json:"daysWindow" ` // 杩戝嚑澶╁唴 + Threshold int `gorm:"type:int;" json:"threshold" ` // 杈惧嚑娆� Task *db.ModelTask } @@ -51,11 +55,40 @@ m.Floor = task.Floor m.AlarmType = task.AlarmType m.PersonType = task.PersonType - m.GatherPersons = task.GatherPersons - m.AppearInterval = task.AppearInterval - m.DaysWindow = task.DaysWindow - m.Threshold = task.Threshold - fmt.Println("GatherModel init finish ...") + + for _, v := range task.Rules { + if v.Alias == "gatherPersons" { + if val, ok := v.Value.(float64); ok { + m.GatherPersons = int(val) + } + } + + if v.Alias == "appearInterval" { + if val, ok := v.Value.(float64); ok { + m.AppearInterval = int(val) + } + } + + if v.Alias == "daysWindow" { + if val, ok := v.Value.(float64); ok { + m.DaysWindow = int(val) + } + } + + if v.Alias == "threshold" { + if val, ok := v.Value.(float64); ok { + m.Threshold = int(val) + } + } + } + + logger.Debugf("GatherModel init finish ...task id:%s, name:%s, rule:%+v\n", task.ID, task.Name, m) + + if m.GatherPersons == 0 || m.AppearInterval == 0 || m.DaysWindow == 0 || m.Threshold == 0 { + logger.Warnf("invalid parameters. task id:%s, name:%s\n", task.ID, task.Name) + return errors.New("invalid parameters") + } + return nil } @@ -67,25 +100,23 @@ OrgId string `json:"orgId"` Building string `json:"building"` Floor string `json:"floor"` - GatherPersons int `gorm:"type:int;" json:"gatherPersons"` //鑱氶泦浜烘暟 - AppearInterval int `gorm:"type:int;" json:"appearInterval"` //鍑虹幇闂撮殧锛屽崟浣嶄负绉� + GatherPersons int `gorm:"type:int;" json:"gatherPersons"` // 鑱氶泦浜烘暟 + AppearInterval int `gorm:"type:int;" json:"appearInterval"` // 鍑虹幇闂撮殧锛屽崟浣嶄负绉� } var ( - processed map[string]ProcessedRecord // 瀛樺偍宸插鐞嗚褰� + processed sync.Map // 瀛樺偍宸插鐞嗚褰� cleanupThreshold = time.Now().Add(-100 * time.Hour) // 瀹氫箟涓�涓椂闂寸獥鍙o紝鍋囪鍙繚瀛樻渶杩�100灏忔椂鐨勮褰� ) -func init() { - processed = make(map[string]ProcessedRecord) -} func (m *GatherModel) Run() error { // 娓呯悊杩囨湡鐨勮褰� - for key, record := range processed { - if record.Timestamp.Before(cleanupThreshold) { - delete(processed, key) + processed.Range(func(key, value any) bool { + if value.(ProcessedRecord).Timestamp.Before(cleanupThreshold) { + processed.Delete(key) } - } + return true + }) records, err := queryElasticsearch(db.GetEsClient(), m) if err != nil { @@ -104,16 +135,19 @@ uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate) // 濡傛灉宸茬粡澶勭悊杩囷紝璺宠繃 - if _, exists := processed[uniqueKey]; exists { + if _, exists := processed.Load(uniqueKey); exists { continue } // 娣诲姞鍒板凡澶勭悊璁板綍 - processed[uniqueKey] = ProcessedRecord{ + processed.Store(uniqueKey, ProcessedRecord{ UniqueKey: uniqueKey, Timestamp: time.Now(), - } + }) newRecords = append(newRecords, record) + } + if len(newRecords) == 0 { + return nil } aggregation, err := analyzeAndAggregate(newRecords) @@ -154,6 +188,11 @@ results = append(results, result) } return service.SaveTaskResults(results) +} + +func (m *GatherModel) KeepAlive() error { + db.GetDB().Model(m.Task).Where("id = ?", m.Task.ID).Update("last_run_time", time.Now()) + return nil } func (m *GatherModel) Shutdown() error { @@ -403,6 +442,9 @@ } func analyzeAndAggregate(records []*GatherRecord) (map[GatherLocationTime]set.StringSet, error) { + if len(records) == 0 { + return nil, nil + } aggregation := make(map[GatherLocationTime]set.StringSet) domainIds := set.NewStringSet() for _, record := range records { -- Gitblit v1.8.0