From 973c3b1684d0d019aa74113b1983fe29f20d2a49 Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期四, 12 十二月 2024 17:03:40 +0800 Subject: [PATCH] 模型任务执行预警结果存表 --- service/task_results.go | 7 models/gather_model.go | 195 +++++++++++++------ db/key_person_base.go | 165 ++++++++++++++++ db/task_results.go | 185 ++++++++++++++++++ service/domain.go | 13 + service/key_person.go | 23 ++ 6 files changed, 523 insertions(+), 65 deletions(-) diff --git a/db/key_person_base.go b/db/key_person_base.go new file mode 100644 index 0000000..1f7bf57 --- /dev/null +++ b/db/key_person_base.go @@ -0,0 +1,165 @@ +package db + +import ( + "fmt" + "gorm.io/gorm" +) + +// KeyPersonBase 閲嶇偣浜哄憳搴曞簱 +type KeyPersonBase struct { + BaseModel + Name string `json:"name" gorm:"type:varchar(255);"` //搴曞簱鍚嶇О + Color string `json:"color" gorm:"type:varchar(255);"` //搴曞簱棰滆壊 + Level string `json:"level" gorm:"type:varchar(255);"` //搴曞簱绛夌骇 + TagName string `json:"tagName" gorm:"type:varchar(255)"` //鏍囩鍚嶇О + TagType string `json:"tagType" gorm:"type:varchar(255)"` //鏍囩绫诲瀷 +} + +func (m *KeyPersonBase) TableName() string { + return "key_person_base" +} + +type KeyPersonBaseSearch struct { + KeyPersonBase + Orm *gorm.DB + PageNum int + PageSize int + Order string + TagTypes []string +} + +func NewKeyPersonBaseSearch() *KeyPersonBaseSearch { + return &KeyPersonBaseSearch{ + Orm: GetDB(), + PageNum: 1, + PageSize: 10, + } +} +func (slf *KeyPersonBaseSearch) SetPage(page, size int) *KeyPersonBaseSearch { + slf.PageNum, slf.PageSize = page, size + return slf +} + +func (slf *KeyPersonBaseSearch) SetOrder(order string) *KeyPersonBaseSearch { + slf.Order = order + return slf +} + +func (slf *KeyPersonBaseSearch) SetID(id string) *KeyPersonBaseSearch { + slf.ID = id + return slf +} + +func (slf *KeyPersonBaseSearch) Set(id string) *KeyPersonBaseSearch { + slf.ID = id + return slf +} + +func (slf *KeyPersonBaseSearch) SetTagTypes(ids []string) *KeyPersonBaseSearch { + slf.TagTypes = ids + return slf +} + +func (slf *KeyPersonBaseSearch) build() *gorm.DB { + var db = slf.Orm.Table(slf.TableName()) + if slf.Order != "" { + db = db.Order(slf.Order) + } + + if slf.ID != "" { + db = db.Where("id = ?", slf.ID) + } + + if slf.Name != "" { + db = db.Where("name = ?", slf.Name) + } + + if len(slf.TagTypes) != 0 { + db = db.Where("tag_type in ?", slf.TagTypes) + } + + return db +} + +func (slf *KeyPersonBaseSearch) First() (*KeyPersonBase, error) { + var ( + record = new(KeyPersonBase) + db = slf.build() + ) + + if err := db.First(record).Error; err != nil { + return record, err + } + + return record, nil +} + +func (slf *KeyPersonBaseSearch) Find() ([]*KeyPersonBase, int64, error) { + var ( + records = make([]*KeyPersonBase, 0) + total int64 + db = slf.build() + ) + + if err := db.Count(&total).Error; err != nil { + return records, total, fmt.Errorf("find count err: %v", err) + } + if slf.PageNum*slf.PageSize > 0 { + db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) + } + if err := db.Find(&records).Error; err != nil { + return records, total, fmt.Errorf("find records err: %v", err) + } + + return records, total, nil +} + +func (slf *KeyPersonBaseSearch) FindAll() ([]*KeyPersonBase, error) { + var ( + records = make([]*KeyPersonBase, 0) + db = slf.build() + ) + if err := db.Find(&records).Error; err != nil { + return records, fmt.Errorf("find records err: %v", err) + } + + return records, nil +} + +func (slf *KeyPersonBaseSearch) Count() int64 { + var ( + count int64 + db = slf.build() + ) + + if err := db.Count(&count).Error; err != nil { + return count + } + + return count +} + +func (slf *KeyPersonBaseSearch) Create(record *KeyPersonBase) error { + var db = slf.build() + + if err := db.Create(record).Error; err != nil { + return fmt.Errorf("create err: %v, record: %+v", err, record) + } + + return nil +} + +func (slf *KeyPersonBaseSearch) Update(record *KeyPersonBase) error { + var db = slf.build() + + if err := db.Updates(record).Error; err != nil { + return fmt.Errorf("update err: %v, record: %+v", err, record) + } + + return nil +} + +func (slf *KeyPersonBaseSearch) Delete() error { + var db = slf.build() + return db.Delete(&KeyPersonBase{}).Error +} diff --git a/db/task_results.go b/db/task_results.go new file mode 100644 index 0000000..3e656bd --- /dev/null +++ b/db/task_results.go @@ -0,0 +1,185 @@ +package db + +import ( + "fmt" + "gorm.io/gorm" +) + +type ModelTaskResults struct { + BaseModel + Title string `json:"name" gorm:"type:varchar(255)"` //棰勮鍚嶇О,鏆傛椂鐢ㄤ换鍔″悕绉� + Event string `json:"event" gorm:"type:varchar(255)"` //棰勮浜嬩欢 + ModelID string `json:"modelID" gorm:"type:varchar(255)"` //妯″瀷ID + ModelTaskID string `json:"modelTaskID" gorm:"type:varchar(255)"` //妯″瀷浠诲姟ID + CommunityId string `json:"communityID" gorm:"index;column:community_id;type:varchar(299);not null;"` //灏忓尯ID + OrgID string `json:"orgID" gorm:"index;column:org_id;type:varchar(299);not null;"` //娲惧嚭鎵� domain unit ID + ObjectIds string `json:"objectIds" gorm:"type:text"` //浜嬩欢瀵硅薄锛屽彲浠ユ槸浜猴紝澶氫釜鐢ㄩ�楀彿鍒嗛殧 + Location string `json:"location" gorm:"type:varchar(255)"` //鍙戠敓鍦扮偣 +} + +func (m *ModelTaskResults) TableName() string { + return "model_task_results" +} + +type ModelTaskResultsSearch struct { + ModelTaskResults + Orm *gorm.DB + PageNum int + PageSize int + Order string + Keyword string +} + +func NewModelTaskResultsSearch() *ModelTaskResultsSearch { + return &ModelTaskResultsSearch{ + Orm: GetDB(), + PageNum: 1, + PageSize: 10, + } +} + +func (slf *ModelTaskResultsSearch) SetOrm(tx *gorm.DB) *ModelTaskResultsSearch { + slf.Orm = tx + return slf +} + +func (slf *ModelTaskResultsSearch) SetPage(page, size int) *ModelTaskResultsSearch { + slf.PageNum, slf.PageSize = page, size + return slf +} + +func (slf *ModelTaskResultsSearch) SetOrder(order string) *ModelTaskResultsSearch { + slf.Order = order + return slf +} + +func (slf *ModelTaskResultsSearch) SetID(id string) *ModelTaskResultsSearch { + slf.ID = id + return slf +} + +func (slf *ModelTaskResultsSearch) SetKeyword(kw string) *ModelTaskResultsSearch { + slf.Keyword = kw + return slf +} + +func (slf *ModelTaskResultsSearch) build() *gorm.DB { + var db = slf.Orm.Table(slf.TableName()) + if slf.Order != "" { + db = db.Order(slf.Order) + } + + if slf.ID != "" { + db = db.Where("id = ?", slf.ID) + } + + if slf.Keyword != "" { + kw := "%" + slf.Keyword + "%" + db = db.Where("name like ?", kw) + } + + return db +} + +func (slf *ModelTaskResultsSearch) First() (*ModelTaskResults, error) { + var ( + record = new(ModelTaskResults) + db = slf.build() + ) + + if err := db.First(record).Error; err != nil { + return record, err + } + + return record, nil +} + +func (slf *ModelTaskResultsSearch) Find() ([]*ModelTaskResults, int64, error) { + var ( + records = make([]*ModelTaskResults, 0) + total int64 + db = slf.build() + ) + + if err := db.Count(&total).Error; err != nil { + return records, total, fmt.Errorf("find count err: %v", err) + } + if slf.PageNum*slf.PageSize > 0 { + db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) + } + if err := db.Find(&records).Error; err != nil { + return records, total, fmt.Errorf("find records err: %v", err) + } + + return records, total, nil +} + +func (slf *ModelTaskResultsSearch) FindAll() ([]*ModelTaskResults, error) { + var ( + records = make([]*ModelTaskResults, 0) + db = slf.build() + ) + if err := db.Find(&records).Error; err != nil { + return records, fmt.Errorf("find records err: %v", err) + } + + return records, nil +} + +func (slf *ModelTaskResultsSearch) Count() int64 { + var ( + count int64 + db = slf.build() + ) + + if err := db.Count(&count).Error; err != nil { + return count + } + + return count +} + +func (slf *ModelTaskResultsSearch) Create(record *ModelTaskResults) error { + var db = slf.build() + + if err := db.Create(record).Error; err != nil { + return fmt.Errorf("create err: %v, record: %+v", err, record) + } + + return nil +} + +func (slf *ModelTaskResultsSearch) BatchCreate(record []*ModelTaskResults) error { + var db = slf.build() + + if err := db.Create(record).Error; err != nil { + return fmt.Errorf("create err: %v, record: %+v", err, record) + } + + return nil +} + +func (slf *ModelTaskResultsSearch) Save(record *ModelTaskResults) error { + var db = slf.build() + + if err := db.Omit("CreatedAt").Save(record).Error; err != nil { + return fmt.Errorf("save err: %v, record: %+v", err, record) + } + + return nil +} + +func (slf *ModelTaskResultsSearch) Update(record *ModelTaskResults) error { + var db = slf.build() + + if err := db.Updates(record).Error; err != nil { + return fmt.Errorf("update err: %v, record: %+v", err, record) + } + + return nil +} + +func (slf *ModelTaskResultsSearch) Delete() error { + var db = slf.build() + return db.Delete(&ModelTaskResults{}).Error +} diff --git a/models/gather_model.go b/models/gather_model.go index 66864cf..54e7626 100644 --- a/models/gather_model.go +++ b/models/gather_model.go @@ -9,6 +9,7 @@ "log" "model-engine/config" "model-engine/db" + "model-engine/pkg/set" "model-engine/service" "strings" "time" @@ -25,6 +26,7 @@ AppearInterval int `gorm:"type:int;" json:"appearInterval"` //鍑虹幇闂撮殧锛屽崟浣嶄负绉� DaysWindow int `gorm:"type:int;" json:"daysWindow" ` //杩戝嚑澶╁唴 Threshold int `gorm:"type:int;" json:"threshold" ` //杈惧嚑娆� + Task *db.ModelTask } func (m *GatherModel) Init(task *db.ModelTask) error { @@ -34,6 +36,7 @@ return err } + m.Task = task m.OrgIds = orgIds m.AreaIds = areaIds m.Building = task.Building @@ -53,6 +56,7 @@ PicDate string `json:"picDate"` DocumentNumber string CommunityId string `json:"communityId"` + OrgId string `json:"orgId"` Building string `json:"building"` Floor string `json:"floor"` GatherPersons int `gorm:"type:int;" json:"gatherPersons"` //鑱氶泦浜烘暟 @@ -74,11 +78,31 @@ log.Fatalf("Failed to analyze and aggregate data: %v", err) } - // Print or process the aggregation results as needed - for location, persons := range aggregation { - fmt.Printf("Gathering detected at %s with %d unique persons\n", location, len(persons)) + if len(aggregation) == 0 { + return nil } - return nil + + tagTypes := strings.Split(m.Task.PersonType, ",") + results := make([]*db.ModelTaskResults, 0, len(aggregation)) + _, typeNames, err := service.GetPersonTypeNameByTypes(tagTypes) + if err != nil { + return err + } + event := strings.Join(typeNames, ",") + for location, persons := range aggregation { + result := &db.ModelTaskResults{ + Title: m.Task.Name, + Event: fmt.Sprintf("%s/%d浜�", event, len(persons)), + ModelID: m.Task.ModelID, + ModelTaskID: m.Task.ID, + CommunityId: location.CommunityId, + OrgID: location.OrgId, + ObjectIds: strings.Join(persons.Elements(), ","), + Location: location.Location, + } + results = append(results, result) + } + return service.SaveTaskResults(results) } func (m *GatherModel) Shutdown() error { @@ -151,46 +175,54 @@ }, }, "aggs": map[string]interface{}{ - "gather_events": map[string]interface{}{ - "date_histogram": map[string]interface{}{ - "field": "picDate", - "interval": fmt.Sprintf("%ds", gatherModel.AppearInterval), - "min_doc_count": 1, + "orgs": map[string]interface{}{ // 鍏堣仛鍚坥rgId + "terms": map[string]interface{}{ + "field": "orgId", // 鑱氬悎orgId + "size": 10000, }, "aggs": map[string]interface{}{ - "community": map[string]interface{}{ + "community": map[string]interface{}{ // 鍦╫rgId鑱氬悎涓嬭仛鍚坈ommunityId "terms": map[string]interface{}{ - "field": "communityId", // 鑱氬悎灏忓尯id + "field": "communityId", // 鑱氬悎communityId "size": 10000, }, "aggs": map[string]interface{}{ - "location": map[string]interface{}{ + "location": map[string]interface{}{ // 鍦╟ommunityId涓嬭仛鍚坆uilding "terms": map[string]interface{}{ "field": "cameraLocation.building", // 鑱氬悎妤兼爧 "size": 10000, }, "aggs": map[string]interface{}{ - "floor": map[string]interface{}{ + "floor": map[string]interface{}{ // 鍦╞uilding涓嬭仛鍚坒loor "terms": map[string]interface{}{ "field": "cameraLocation.floor", // 鑱氬悎妤煎眰 "size": 10000, }, "aggs": map[string]interface{}{ - "people": map[string]interface{}{ - "terms": map[string]interface{}{ - "field": "documentNumber", // 鎸変汉鍛樺敮涓�鏍囪瘑鑱氬悎 - "size": 10000, + "gather_events": map[string]interface{}{ // 鍦╢loor涓嬭仛鍚坓ather_events + "date_histogram": map[string]interface{}{ + "field": "picDate", + "interval": fmt.Sprintf("%ds", gatherModel.AppearInterval), + "min_doc_count": 1, }, - }, - "filter_gather": map[string]interface{}{ - "bucket_selector": map[string]interface{}{ - "buckets_path": map[string]interface{}{ - "personCount": "people._bucket_count", // 缁熻浜烘暟 + "aggs": map[string]interface{}{ + "people": map[string]interface{}{ + "terms": map[string]interface{}{ + "field": "documentNumber", // 鎸変汉鍛樺敮涓�鏍囪瘑鑱氬悎 + "size": 10000, + }, }, - "script": map[string]interface{}{ - "source": "params.personCount >= params.gatherPersons", // 鑱氶泦浜烘暟杩囨护 - "params": map[string]interface{}{ - "gatherPersons": gatherModel.GatherPersons, + "filter_gather": map[string]interface{}{ + "bucket_selector": map[string]interface{}{ + "buckets_path": map[string]interface{}{ + "personCount": "people._bucket_count", // 缁熻浜烘暟 + }, + "script": map[string]interface{}{ + "source": "params.personCount >= params.gatherPersons", // 鑱氶泦浜烘暟杩囨护 + "params": map[string]interface{}{ + "gatherPersons": gatherModel.GatherPersons, + }, + }, }, }, }, @@ -237,44 +269,50 @@ // 瑙f瀽鑱氬悎缁撴灉 var records []GatherRecord if aggs, ok := result["aggregations"].(map[string]interface{}); ok { - if gatherEvents, ok := aggs["gather_events"].(map[string]interface{}); ok { - if buckets, ok := gatherEvents["buckets"].([]interface{}); ok { - for _, bucket := range buckets { - key := int64(bucket.(map[string]interface{})["key"].(float64)) / 1000 // 灏嗘绉掕浆鎹负绉� - timestamp := time.Unix(key, 0).Format("2006-01-02T15:04:05") + if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok { + for _, orgBucket := range orgBuckets { + orgId := orgBucket.(map[string]interface{})["key"].(string) - // 瑙f瀽鎸夊皬鍖恒�佹ゼ鏍嬪拰妤煎眰鐨勮仛鍚堢粨鏋� - if communityBuckets, ok := bucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok { - for _, communityBucket := range communityBuckets { - communityId := communityBucket.(map[string]interface{})["key"].(string) + // 瑙f瀽鎸塩ommunityId鐨勮仛鍚堢粨鏋� + if communityBuckets, ok := orgBucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok { + for _, communityBucket := range communityBuckets { + communityId := communityBucket.(map[string]interface{})["key"].(string) - // 瑙f瀽鎸夋ゼ鏍嬪拰妤煎眰鐨勮仛鍚堢粨鏋� - if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok { - for _, locationBucket := range locationBuckets { - building := locationBucket.(map[string]interface{})["key"].(string) + // 瑙f瀽鎸塨uilding鐨勮仛鍚堢粨鏋� + if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok { + for _, locationBucket := range locationBuckets { + building := locationBucket.(map[string]interface{})["key"].(string) - // 瑙f瀽妤煎眰 - if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok { - for _, floorBucket := range floorBuckets { - floor := floorBucket.(map[string]interface{})["key"].(string) + // 瑙f瀽鎸塮loor鐨勮仛鍚堢粨鏋� + if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok { + for _, floorBucket := range floorBuckets { + floor := floorBucket.(map[string]interface{})["key"].(string) - // 瑙f瀽浜哄憳 - if peopleBuckets, ok := floorBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok { - for _, person := range peopleBuckets { - documentNumber := person.(map[string]interface{})["key"].(string) + // 瑙f瀽鑱氬悎鐨勪簨浠� + if gatherEvents, ok := floorBucket.(map[string]interface{})["gather_events"].(map[string]interface{})["buckets"].([]interface{}); ok { + for _, eventBucket := range gatherEvents { + key := int64(eventBucket.(map[string]interface{})["key"].(float64)) / 1000 // 灏嗘绉掕浆鎹负绉� + timestamp := time.Unix(key, 0).Format("2006-01-02T15:04:05") - // 鏋勫缓 GatherRecord 缁撴瀯浣� - record := GatherRecord{ - PicDate: timestamp, - DocumentNumber: documentNumber, - CommunityId: communityId, - Building: building, - Floor: floor, - AppearInterval: gatherModel.AppearInterval, - GatherPersons: gatherModel.GatherPersons, + // 瑙f瀽浜哄憳 + if peopleBuckets, ok := eventBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok { + for _, person := range peopleBuckets { + documentNumber := person.(map[string]interface{})["key"].(string) + + // 鏋勫缓 GatherRecord 缁撴瀯浣� + record := GatherRecord{ + PicDate: timestamp, + DocumentNumber: documentNumber, + CommunityId: communityId, + Building: building, + Floor: floor, + OrgId: orgId, + AppearInterval: gatherModel.AppearInterval, + GatherPersons: gatherModel.GatherPersons, + } + + records = append(records, record) } - - records = append(records, record) } } } @@ -291,15 +329,42 @@ return records, nil } -func analyzeAndAggregate(records []GatherRecord) (map[string][]string, error) { - // Implement logic to aggregate and analyze data based on GatherModel parameters - // This is a placeholder for the actual implementation - aggregation := make(map[string][]string) +type GatherLocation struct { + CommunityId string + OrgId string + Building string + Floor string + Location string +} - // Example logic: +func analyzeAndAggregate(records []GatherRecord) (map[GatherLocation]set.StringSet, error) { + aggregation := make(map[GatherLocation]set.StringSet) + domainIds := set.NewStringSet() for _, record := range records { - key := fmt.Sprintf("%s%s%s", record.CommunityId, record.Building, record.Floor) - aggregation[key] = append(aggregation[key], record.DocumentNumber) + domainIds.Add(record.CommunityId) + } + + domains, err := service.GetUnitsMapByIds(domainIds.Elements()) + if err != nil { + return nil, err + } + + for _, record := range records { + if domains[record.CommunityId] == nil { + continue + } + + location := GatherLocation{ + CommunityId: record.CommunityId, + OrgId: record.OrgId, + Building: record.Building, + Floor: record.Floor, + Location: fmt.Sprintf("%s%s%s", domains[record.CommunityId].Name, record.Building, record.Floor), + } + if aggregation[location] == nil { + aggregation[location] = set.NewStringSet() + } + aggregation[location].Add(record.DocumentNumber) } return aggregation, nil diff --git a/service/domain.go b/service/domain.go index 851b054..def0191 100644 --- a/service/domain.go +++ b/service/domain.go @@ -78,3 +78,16 @@ return list, nil } + +func GetUnitsMapByIds(ids []string) (m map[string]*db.DomainUnit, err error) { + list := make([]*db.DomainUnit, 0) + err = db.GetDB().Model(&db.DomainUnit{}).Where("id in ?", ids).Find(&list).Error + if err != nil { + return nil, err + } + m = make(map[string]*db.DomainUnit, len(list)) + for _, v := range list { + m[v.ID] = v + } + return +} diff --git a/service/key_person.go b/service/key_person.go new file mode 100644 index 0000000..bbe996a --- /dev/null +++ b/service/key_person.go @@ -0,0 +1,23 @@ +package service + +import "model-engine/db" + +func GetPersonTypeNameByTypes(types []string) (m map[string]string, names []string, err error) { + m = make(map[string]string) + + records, err := db.NewKeyPersonBaseSearch().SetTagTypes(types).FindAll() + if err != nil { + return + } + + for _, record := range records { + m[record.TagType] = record.TagName + } + + for _, t := range types { + names = append(names, m[t]) + } + + return + +} diff --git a/service/task_results.go b/service/task_results.go new file mode 100644 index 0000000..bdeef8f --- /dev/null +++ b/service/task_results.go @@ -0,0 +1,7 @@ +package service + +import "model-engine/db" + +func SaveTaskResults(results []*db.ModelTaskResults) error { + return db.NewModelTaskResultsSearch().BatchCreate(results) +} -- Gitblit v1.8.0