From 2332cb0d6829a421b4d1826bb1825f9df6afdd74 Mon Sep 17 00:00:00 2001
From: zhaoqingang <zhaoqg0118@163.com>
Date: 星期一, 10 二月 2025 10:33:16 +0800
Subject: [PATCH] 测试 场景分析 bug
---
models/gather_model.go | 100 +++++++++++++++++++++++++++++++++++--------------
1 files changed, 71 insertions(+), 29 deletions(-)
diff --git a/models/gather_model.go b/models/gather_model.go
index 59187ce..59fe8a4 100644
--- a/models/gather_model.go
+++ b/models/gather_model.go
@@ -6,27 +6,31 @@
"encoding/json"
"errors"
"fmt"
- "github.com/elastic/go-elasticsearch/v6"
"log"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/elastic/go-elasticsearch/v6"
+
"model-engine/config"
"model-engine/db"
+ "model-engine/pkg/logger"
"model-engine/pkg/set"
"model-engine/service"
- "strings"
- "time"
)
type GatherModel struct {
OrgIds []interface{} `json:"-"`
AreaIds []interface{} `json:"-"`
- Building string `gorm:"type:varchar(255)" json:"building"` //妤兼爧
- Floor string `gorm:"type:varchar(255)" json:"floor"` //妤煎眰
- AlarmType db.AlarmType `gorm:"type:varchar(255);" json:"alarmType"` //棰勮鏂瑰紡
- PersonType string `gorm:"type:varchar(255);" json:"personType"` //浜哄憳绫诲瀷
- GatherPersons int `gorm:"type:int;" json:"gatherPersons"` //鑱氶泦浜烘暟
- AppearInterval int `gorm:"type:int;" json:"appearInterval"` //鍑虹幇闂撮殧锛屽崟浣嶄负绉�
- DaysWindow int `gorm:"type:int;" json:"daysWindow" ` //杩戝嚑澶╁唴
- Threshold int `gorm:"type:int;" json:"threshold" ` //杈惧嚑娆�
+ Building string `gorm:"type:varchar(255)" json:"building"` // 妤兼爧
+ Floor string `gorm:"type:varchar(255)" json:"floor"` // 妤煎眰
+ AlarmType db.AlarmType `gorm:"type:varchar(255);" json:"alarmType"` // 棰勮鏂瑰紡
+ PersonType string `gorm:"type:varchar(255);" json:"personType"` // 浜哄憳绫诲瀷
+ GatherPersons int `gorm:"type:int;" json:"gatherPersons"` // 鑱氶泦浜烘暟
+ AppearInterval int `gorm:"type:int;" json:"appearInterval"` // 锛屽崟浣嶄负绉�
+ DaysWindow int `gorm:"type:int;" json:"daysWindow" ` // 杩戝嚑澶╁唴
+ Threshold int `gorm:"type:int;" json:"threshold" ` // 杈惧嚑娆�
Task *db.ModelTask
}
@@ -51,11 +55,40 @@
m.Floor = task.Floor
m.AlarmType = task.AlarmType
m.PersonType = task.PersonType
- m.GatherPersons = task.GatherPersons
- m.AppearInterval = task.AppearInterval
- m.DaysWindow = task.DaysWindow
- m.Threshold = task.Threshold
- fmt.Println("GatherModel init finish ...")
+
+ for _, v := range task.Rules {
+ if v.Alias == "gatherPersons" {
+ if val, ok := v.Value.(float64); ok {
+ m.GatherPersons = int(val)
+ }
+ }
+
+ if v.Alias == "appearInterval" {
+ if val, ok := v.Value.(float64); ok {
+ m.AppearInterval = int(val)
+ }
+ }
+
+ if v.Alias == "daysWindow" {
+ if val, ok := v.Value.(float64); ok {
+ m.DaysWindow = int(val)
+ }
+ }
+
+ if v.Alias == "threshold" {
+ if val, ok := v.Value.(float64); ok {
+ m.Threshold = int(val)
+ }
+ }
+ }
+
+ logger.Debugf("GatherModel init finish ...task id:%s, name:%s, rule:%+v\n", task.ID, task.Name, m)
+
+ if m.GatherPersons == 0 || m.AppearInterval == 0 || m.DaysWindow == 0 || m.Threshold == 0 {
+ logger.Warnf("invalid parameters. task id:%s, name:%s\n", task.ID, task.Name)
+ return errors.New("invalid parameters")
+ }
+
return nil
}
@@ -67,25 +100,23 @@
OrgId string `json:"orgId"`
Building string `json:"building"`
Floor string `json:"floor"`
- GatherPersons int `gorm:"type:int;" json:"gatherPersons"` //鑱氶泦浜烘暟
- AppearInterval int `gorm:"type:int;" json:"appearInterval"` //鍑虹幇闂撮殧锛屽崟浣嶄负绉�
+ GatherPersons int `gorm:"type:int;" json:"gatherPersons"` // 鑱氶泦浜烘暟
+ AppearInterval int `gorm:"type:int;" json:"appearInterval"` // 鍑虹幇闂撮殧锛屽崟浣嶄负绉�
}
var (
- processed map[string]ProcessedRecord // 瀛樺偍宸插鐞嗚褰�
+ processed sync.Map // 瀛樺偍宸插鐞嗚褰�
cleanupThreshold = time.Now().Add(-100 * time.Hour) // 瀹氫箟涓�涓椂闂寸獥鍙o紝鍋囪鍙繚瀛樻渶杩�100灏忔椂鐨勮褰�
)
-func init() {
- processed = make(map[string]ProcessedRecord)
-}
func (m *GatherModel) Run() error {
// 娓呯悊杩囨湡鐨勮褰�
- for key, record := range processed {
- if record.Timestamp.Before(cleanupThreshold) {
- delete(processed, key)
+ processed.Range(func(key, value any) bool {
+ if value.(ProcessedRecord).Timestamp.Before(cleanupThreshold) {
+ processed.Delete(key)
}
- }
+ return true
+ })
records, err := queryElasticsearch(db.GetEsClient(), m)
if err != nil {
@@ -104,16 +135,19 @@
uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate)
// 濡傛灉宸茬粡澶勭悊杩囷紝璺宠繃
- if _, exists := processed[uniqueKey]; exists {
+ if _, exists := processed.Load(uniqueKey); exists {
continue
}
// 娣诲姞鍒板凡澶勭悊璁板綍
- processed[uniqueKey] = ProcessedRecord{
+ processed.Store(uniqueKey, ProcessedRecord{
UniqueKey: uniqueKey,
Timestamp: time.Now(),
- }
+ })
newRecords = append(newRecords, record)
+ }
+ if len(newRecords) == 0 {
+ return nil
}
aggregation, err := analyzeAndAggregate(newRecords)
@@ -154,6 +188,11 @@
results = append(results, result)
}
return service.SaveTaskResults(results)
+}
+
+func (m *GatherModel) KeepAlive() error {
+ db.GetDB().Model(m.Task).Where("id = ?", m.Task.ID).Update("last_run_time", time.Now())
+ return nil
}
func (m *GatherModel) Shutdown() error {
@@ -403,6 +442,9 @@
}
func analyzeAndAggregate(records []*GatherRecord) (map[GatherLocationTime]set.StringSet, error) {
+ if len(records) == 0 {
+ return nil, nil
+ }
aggregation := make(map[GatherLocationTime]set.StringSet)
domainIds := set.NewStringSet()
for _, record := range records {
--
Gitblit v1.8.0