From 2332cb0d6829a421b4d1826bb1825f9df6afdd74 Mon Sep 17 00:00:00 2001
From: zhaoqingang <zhaoqg0118@163.com>
Date: 星期一, 10 二月 2025 10:33:16 +0800
Subject: [PATCH] 测试 场景分析 bug
---
models/gather_model.go | 348 +++++++++++++++++++++++++++++++++++++++++++--------------
1 files changed, 262 insertions(+), 86 deletions(-)
diff --git a/models/gather_model.go b/models/gather_model.go
index 66864cf..59fe8a4 100644
--- a/models/gather_model.go
+++ b/models/gather_model.go
@@ -4,47 +4,91 @@
"bytes"
"context"
"encoding/json"
+ "errors"
"fmt"
- "github.com/elastic/go-elasticsearch/v6"
"log"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/elastic/go-elasticsearch/v6"
+
"model-engine/config"
"model-engine/db"
+ "model-engine/pkg/logger"
+ "model-engine/pkg/set"
"model-engine/service"
- "strings"
- "time"
)
type GatherModel struct {
OrgIds []interface{} `json:"-"`
AreaIds []interface{} `json:"-"`
- Building string `gorm:"type:varchar(255)" json:"building"` //妤兼爧
- Floor string `gorm:"type:varchar(255)" json:"floor"` //妤煎眰
- AlarmType db.AlarmType `gorm:"type:varchar(255);" json:"alarmType"` //棰勮鏂瑰紡
- PersonType string `gorm:"type:varchar(255);" json:"personType"` //浜哄憳绫诲瀷
- GatherPersons int `gorm:"type:int;" json:"gatherPersons"` //鑱氶泦浜烘暟
- AppearInterval int `gorm:"type:int;" json:"appearInterval"` //鍑虹幇闂撮殧锛屽崟浣嶄负绉�
- DaysWindow int `gorm:"type:int;" json:"daysWindow" ` //杩戝嚑澶╁唴
- Threshold int `gorm:"type:int;" json:"threshold" ` //杈惧嚑娆�
+ Building string `gorm:"type:varchar(255)" json:"building"` // 妤兼爧
+ Floor string `gorm:"type:varchar(255)" json:"floor"` // 妤煎眰
+ AlarmType db.AlarmType `gorm:"type:varchar(255);" json:"alarmType"` // 棰勮鏂瑰紡
+ PersonType string `gorm:"type:varchar(255);" json:"personType"` // 浜哄憳绫诲瀷
+ GatherPersons int `gorm:"type:int;" json:"gatherPersons"` // 鑱氶泦浜烘暟
+ AppearInterval int `gorm:"type:int;" json:"appearInterval"` // 锛屽崟浣嶄负绉�
+ DaysWindow int `gorm:"type:int;" json:"daysWindow" ` // 杩戝嚑澶╁唴
+ Threshold int `gorm:"type:int;" json:"threshold" ` // 杈惧嚑娆�
+ Task *db.ModelTask
+}
+
+type ProcessedRecord struct {
+ UniqueKey string // 鍞竴鏍囪瘑
+ Timestamp time.Time // 璁板綍鐨勬椂闂存埑
}
func (m *GatherModel) Init(task *db.ModelTask) error {
-
+ if len(task.DomainUnitIds) == 0 {
+ return errors.New("empty domain set")
+ }
orgIds, areaIds, err := service.GetOrgIdsAndAreaIdsByDomainUnitIds(task.DomainUnitIds)
if err != nil {
return err
}
+ m.Task = task
m.OrgIds = orgIds
m.AreaIds = areaIds
m.Building = task.Building
m.Floor = task.Floor
m.AlarmType = task.AlarmType
m.PersonType = task.PersonType
- m.GatherPersons = task.GatherPersons
- m.AppearInterval = task.AppearInterval
- m.DaysWindow = task.DaysWindow
- m.Threshold = task.Threshold
- fmt.Println("GatherModel init finish ...")
+
+ for _, v := range task.Rules {
+ if v.Alias == "gatherPersons" {
+ if val, ok := v.Value.(float64); ok {
+ m.GatherPersons = int(val)
+ }
+ }
+
+ if v.Alias == "appearInterval" {
+ if val, ok := v.Value.(float64); ok {
+ m.AppearInterval = int(val)
+ }
+ }
+
+ if v.Alias == "daysWindow" {
+ if val, ok := v.Value.(float64); ok {
+ m.DaysWindow = int(val)
+ }
+ }
+
+ if v.Alias == "threshold" {
+ if val, ok := v.Value.(float64); ok {
+ m.Threshold = int(val)
+ }
+ }
+ }
+
+ logger.Debugf("GatherModel init finish ...task id:%s, name:%s, rule:%+v\n", task.ID, task.Name, m)
+
+ if m.GatherPersons == 0 || m.AppearInterval == 0 || m.DaysWindow == 0 || m.Threshold == 0 {
+ logger.Warnf("invalid parameters. task id:%s, name:%s\n", task.ID, task.Name)
+ return errors.New("invalid parameters")
+ }
+
return nil
}
@@ -53,13 +97,27 @@
PicDate string `json:"picDate"`
DocumentNumber string
CommunityId string `json:"communityId"`
+ OrgId string `json:"orgId"`
Building string `json:"building"`
Floor string `json:"floor"`
- GatherPersons int `gorm:"type:int;" json:"gatherPersons"` //鑱氶泦浜烘暟
- AppearInterval int `gorm:"type:int;" json:"appearInterval"` //鍑虹幇闂撮殧锛屽崟浣嶄负绉�
+ GatherPersons int `gorm:"type:int;" json:"gatherPersons"` // 鑱氶泦浜烘暟
+ AppearInterval int `gorm:"type:int;" json:"appearInterval"` // 鍑虹幇闂撮殧锛屽崟浣嶄负绉�
}
+var (
+ processed sync.Map // 瀛樺偍宸插鐞嗚褰�
+ cleanupThreshold = time.Now().Add(-100 * time.Hour) // 瀹氫箟涓�涓椂闂寸獥鍙o紝鍋囪鍙繚瀛樻渶杩�100灏忔椂鐨勮褰�
+)
+
func (m *GatherModel) Run() error {
+ // 娓呯悊杩囨湡鐨勮褰�
+ processed.Range(func(key, value any) bool {
+ if value.(ProcessedRecord).Timestamp.Before(cleanupThreshold) {
+ processed.Delete(key)
+ }
+ return true
+ })
+
records, err := queryElasticsearch(db.GetEsClient(), m)
if err != nil {
log.Fatalf("Failed to query Elasticsearch: %v", err)
@@ -69,15 +127,71 @@
return nil
}
- aggregation, err := analyzeAndAggregate(records)
+ newRecords := make([]*GatherRecord, 0)
+
+ // 鑱氬悎閫昏緫
+ for _, record := range records {
+ // 鐢熸垚鍞竴鏍囪瘑
+ uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate)
+
+ // 濡傛灉宸茬粡澶勭悊杩囷紝璺宠繃
+ if _, exists := processed.Load(uniqueKey); exists {
+ continue
+ }
+
+ // 娣诲姞鍒板凡澶勭悊璁板綍
+ processed.Store(uniqueKey, ProcessedRecord{
+ UniqueKey: uniqueKey,
+ Timestamp: time.Now(),
+ })
+ newRecords = append(newRecords, record)
+ }
+ if len(newRecords) == 0 {
+ return nil
+ }
+
+ aggregation, err := analyzeAndAggregate(newRecords)
if err != nil {
log.Fatalf("Failed to analyze and aggregate data: %v", err)
}
- // Print or process the aggregation results as needed
- for location, persons := range aggregation {
- fmt.Printf("Gathering detected at %s with %d unique persons\n", location, len(persons))
+ if len(aggregation) == 0 {
+ return nil
}
+
+ tagTypes := strings.Split(m.Task.PersonType, ",")
+ results := make([]*db.ModelTaskResults, 0, len(aggregation))
+ _, typeNames, err := service.GetPersonTypeNameByTypes(tagTypes)
+ if err != nil {
+ return err
+ }
+ event := strings.Join(typeNames, ",")
+ for lt, persons := range aggregation {
+ if persons.Size() == 0 {
+ continue
+ }
+ personIds := persons.Elements()
+ result := &db.ModelTaskResults{
+ Title: m.Task.Name,
+ Event: fmt.Sprintf("%s/%d浜�", event, len(persons)),
+ ModelID: m.Task.ModelID,
+ ModelTaskID: m.Task.ID,
+ CommunityId: lt.CommunityId,
+ OrgID: lt.OrgId,
+ ObjectIds: strings.Join(personIds, ","),
+ Location: lt.Location,
+ Building: lt.Building,
+ Floor: lt.Floor,
+ PicDate: lt.Time,
+ FirstPersonID: personIds[0],
+ }
+ results = append(results, result)
+ }
+ return service.SaveTaskResults(results)
+}
+
+func (m *GatherModel) KeepAlive() error {
+ db.GetDB().Model(m.Task).Where("id = ?", m.Task.ID).Update("last_run_time", time.Now())
return nil
}
@@ -87,7 +201,7 @@
return nil
}
-func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]GatherRecord, error) {
+func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]*GatherRecord, error) {
var buf bytes.Buffer
now := time.Now()
start := now.Add(-time.Duration(gatherModel.DaysWindow) * 24 * time.Hour)
@@ -151,46 +265,67 @@
},
},
"aggs": map[string]interface{}{
- "gather_events": map[string]interface{}{
- "date_histogram": map[string]interface{}{
- "field": "picDate",
- "interval": fmt.Sprintf("%ds", gatherModel.AppearInterval),
- "min_doc_count": 1,
+ "orgs": map[string]interface{}{ // 鍏堣仛鍚坥rgId
+ "terms": map[string]interface{}{
+ "field": "orgId", // 鑱氬悎orgId
+ "size": 10000,
},
"aggs": map[string]interface{}{
- "community": map[string]interface{}{
+ "community": map[string]interface{}{ // 鍦╫rgId鑱氬悎涓嬭仛鍚坈ommunityId
"terms": map[string]interface{}{
- "field": "communityId", // 鑱氬悎灏忓尯id
+ "field": "communityId", // 鑱氬悎communityId
"size": 10000,
},
"aggs": map[string]interface{}{
- "location": map[string]interface{}{
+ "location": map[string]interface{}{ // 鍦╟ommunityId涓嬭仛鍚坆uilding
"terms": map[string]interface{}{
"field": "cameraLocation.building", // 鑱氬悎妤兼爧
"size": 10000,
},
"aggs": map[string]interface{}{
- "floor": map[string]interface{}{
+ "floor": map[string]interface{}{ // 鍦╞uilding涓嬭仛鍚坒loor
"terms": map[string]interface{}{
"field": "cameraLocation.floor", // 鑱氬悎妤煎眰
"size": 10000,
},
"aggs": map[string]interface{}{
- "people": map[string]interface{}{
- "terms": map[string]interface{}{
- "field": "documentNumber", // 鎸変汉鍛樺敮涓�鏍囪瘑鑱氬悎
- "size": 10000,
+ "gather_events": map[string]interface{}{ // 鍦╢loor涓嬭仛鍚坓ather_events
+ "date_histogram": map[string]interface{}{
+ "field": "picDate",
+ "interval": fmt.Sprintf("%ds", gatherModel.AppearInterval),
+ "min_doc_count": 1,
},
- },
- "filter_gather": map[string]interface{}{
- "bucket_selector": map[string]interface{}{
- "buckets_path": map[string]interface{}{
- "personCount": "people._bucket_count", // 缁熻浜烘暟
+ "aggs": map[string]interface{}{
+ "people": map[string]interface{}{
+ "terms": map[string]interface{}{
+ "field": "documentNumber", // 鎸変汉鍛樺敮涓�鏍囪瘑鑱氬悎
+ "size": 10000,
+ },
},
- "script": map[string]interface{}{
- "source": "params.personCount >= params.gatherPersons", // 鑱氶泦浜烘暟杩囨护
- "params": map[string]interface{}{
- "gatherPersons": gatherModel.GatherPersons,
+ "filter_gather": map[string]interface{}{
+ "bucket_selector": map[string]interface{}{
+ "buckets_path": map[string]interface{}{
+ "personCount": "people._bucket_count", // 缁熻浜烘暟
+ },
+ "script": map[string]interface{}{
+ "source": "params.personCount >= params.gatherPersons", // 鑱氶泦浜烘暟杩囨护
+ "params": map[string]interface{}{
+ "gatherPersons": gatherModel.GatherPersons,
+ },
+ },
+ },
+ },
+ "frequency_filter": map[string]interface{}{ // 娣诲姞棰戠巼杩囨护
+ "bucket_selector": map[string]interface{}{
+ "buckets_path": map[string]interface{}{
+ "eventCount": "_count", // 鑱氬悎浜嬩欢娆℃暟
+ },
+ "script": map[string]interface{}{
+ "source": "params.eventCount >= params.threshold", // 绛涢�夐鐜囪揪鍒伴槇鍊肩殑浜嬩欢
+ "params": map[string]interface{}{
+ "threshold": gatherModel.Threshold,
+ },
+ },
},
},
},
@@ -235,46 +370,52 @@
}
// 瑙f瀽鑱氬悎缁撴灉
- var records []GatherRecord
+ var records []*GatherRecord
if aggs, ok := result["aggregations"].(map[string]interface{}); ok {
- if gatherEvents, ok := aggs["gather_events"].(map[string]interface{}); ok {
- if buckets, ok := gatherEvents["buckets"].([]interface{}); ok {
- for _, bucket := range buckets {
- key := int64(bucket.(map[string]interface{})["key"].(float64)) / 1000 // 灏嗘绉掕浆鎹负绉�
- timestamp := time.Unix(key, 0).Format("2006-01-02T15:04:05")
+ if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok {
+ for _, orgBucket := range orgBuckets {
+ orgId := orgBucket.(map[string]interface{})["key"].(string)
- // 瑙f瀽鎸夊皬鍖恒�佹ゼ鏍嬪拰妤煎眰鐨勮仛鍚堢粨鏋�
- if communityBuckets, ok := bucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok {
- for _, communityBucket := range communityBuckets {
- communityId := communityBucket.(map[string]interface{})["key"].(string)
+ // 瑙f瀽鎸塩ommunityId鐨勮仛鍚堢粨鏋�
+ if communityBuckets, ok := orgBucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok {
+ for _, communityBucket := range communityBuckets {
+ communityId := communityBucket.(map[string]interface{})["key"].(string)
- // 瑙f瀽鎸夋ゼ鏍嬪拰妤煎眰鐨勮仛鍚堢粨鏋�
- if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok {
- for _, locationBucket := range locationBuckets {
- building := locationBucket.(map[string]interface{})["key"].(string)
+ // 瑙f瀽鎸塨uilding鐨勮仛鍚堢粨鏋�
+ if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok {
+ for _, locationBucket := range locationBuckets {
+ building := locationBucket.(map[string]interface{})["key"].(string)
- // 瑙f瀽妤煎眰
- if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok {
- for _, floorBucket := range floorBuckets {
- floor := floorBucket.(map[string]interface{})["key"].(string)
+ // 瑙f瀽鎸塮loor鐨勮仛鍚堢粨鏋�
+ if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok {
+ for _, floorBucket := range floorBuckets {
+ floor := floorBucket.(map[string]interface{})["key"].(string)
- // 瑙f瀽浜哄憳
- if peopleBuckets, ok := floorBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok {
- for _, person := range peopleBuckets {
- documentNumber := person.(map[string]interface{})["key"].(string)
+ // 瑙f瀽鑱氬悎鐨勪簨浠�
+ if gatherEvents, ok := floorBucket.(map[string]interface{})["gather_events"].(map[string]interface{})["buckets"].([]interface{}); ok {
+ for _, eventBucket := range gatherEvents {
+ key := int64(eventBucket.(map[string]interface{})["key"].(float64)) / 1000 // 灏嗘绉掕浆鎹负绉�
+ timestamp := time.Unix(key, 0).UTC().Format("2006-01-02 15:04:05")
- // 鏋勫缓 GatherRecord 缁撴瀯浣�
- record := GatherRecord{
- PicDate: timestamp,
- DocumentNumber: documentNumber,
- CommunityId: communityId,
- Building: building,
- Floor: floor,
- AppearInterval: gatherModel.AppearInterval,
- GatherPersons: gatherModel.GatherPersons,
+ // 瑙f瀽浜哄憳
+ if peopleBuckets, ok := eventBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok {
+ for _, person := range peopleBuckets {
+ documentNumber := person.(map[string]interface{})["key"].(string)
+
+ // 鏋勫缓 GatherRecord 缁撴瀯浣�
+ record := &GatherRecord{
+ PicDate: timestamp,
+ DocumentNumber: documentNumber,
+ CommunityId: communityId,
+ Building: building,
+ Floor: floor,
+ OrgId: orgId,
+ AppearInterval: gatherModel.AppearInterval,
+ GatherPersons: gatherModel.GatherPersons,
+ }
+
+ records = append(records, record)
}
-
- records = append(records, record)
}
}
}
@@ -291,15 +432,50 @@
return records, nil
}
-func analyzeAndAggregate(records []GatherRecord) (map[string][]string, error) {
- // Implement logic to aggregate and analyze data based on GatherModel parameters
- // This is a placeholder for the actual implementation
- aggregation := make(map[string][]string)
+type GatherLocationTime struct {
+ CommunityId string
+ OrgId string
+ Building string
+ Floor string
+ Location string
+ Time string
+}
- // Example logic:
+func analyzeAndAggregate(records []*GatherRecord) (map[GatherLocationTime]set.StringSet, error) {
+ if len(records) == 0 {
+ return nil, nil
+ }
+ aggregation := make(map[GatherLocationTime]set.StringSet)
+ domainIds := set.NewStringSet()
for _, record := range records {
- key := fmt.Sprintf("%s%s%s", record.CommunityId, record.Building, record.Floor)
- aggregation[key] = append(aggregation[key], record.DocumentNumber)
+ domainIds.Add(record.CommunityId)
+ }
+
+ domains, err := service.GetUnitsMapByIds(domainIds.Elements())
+ if err != nil {
+ return nil, err
+ }
+
+ for _, record := range records {
+ if record.DocumentNumber == "" {
+ continue
+ }
+ if domains[record.CommunityId] == nil {
+ continue
+ }
+
+ location := GatherLocationTime{
+ CommunityId: record.CommunityId,
+ OrgId: record.OrgId,
+ Building: record.Building,
+ Floor: record.Floor,
+ Location: fmt.Sprintf("%s%s%s", domains[record.CommunityId].Name, record.Building, record.Floor),
+ Time: record.PicDate,
+ }
+ if aggregation[location] == nil {
+ aggregation[location] = set.NewStringSet()
+ }
+ aggregation[location].Add(record.DocumentNumber)
}
return aggregation, nil
--
Gitblit v1.8.0