db/key_person_base.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
db/task_results.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
models/gather_model.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
service/domain.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
service/key_person.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
service/task_results.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
db/key_person_base.go
New file @@ -0,0 +1,165 @@ package db import ( "fmt" "gorm.io/gorm" ) // KeyPersonBase 重点人员底库 type KeyPersonBase struct { BaseModel Name string `json:"name" gorm:"type:varchar(255);"` //底库名称 Color string `json:"color" gorm:"type:varchar(255);"` //底库颜色 Level string `json:"level" gorm:"type:varchar(255);"` //底库等级 TagName string `json:"tagName" gorm:"type:varchar(255)"` //标签名称 TagType string `json:"tagType" gorm:"type:varchar(255)"` //标签类型 } func (m *KeyPersonBase) TableName() string { return "key_person_base" } type KeyPersonBaseSearch struct { KeyPersonBase Orm *gorm.DB PageNum int PageSize int Order string TagTypes []string } func NewKeyPersonBaseSearch() *KeyPersonBaseSearch { return &KeyPersonBaseSearch{ Orm: GetDB(), PageNum: 1, PageSize: 10, } } func (slf *KeyPersonBaseSearch) SetPage(page, size int) *KeyPersonBaseSearch { slf.PageNum, slf.PageSize = page, size return slf } func (slf *KeyPersonBaseSearch) SetOrder(order string) *KeyPersonBaseSearch { slf.Order = order return slf } func (slf *KeyPersonBaseSearch) SetID(id string) *KeyPersonBaseSearch { slf.ID = id return slf } func (slf *KeyPersonBaseSearch) Set(id string) *KeyPersonBaseSearch { slf.ID = id return slf } func (slf *KeyPersonBaseSearch) SetTagTypes(ids []string) *KeyPersonBaseSearch { slf.TagTypes = ids return slf } func (slf *KeyPersonBaseSearch) build() *gorm.DB { var db = slf.Orm.Table(slf.TableName()) if slf.Order != "" { db = db.Order(slf.Order) } if slf.ID != "" { db = db.Where("id = ?", slf.ID) } if slf.Name != "" { db = db.Where("name = ?", slf.Name) } if len(slf.TagTypes) != 0 { db = db.Where("tag_type in ?", slf.TagTypes) } return db } func (slf *KeyPersonBaseSearch) First() (*KeyPersonBase, error) { var ( record = new(KeyPersonBase) db = slf.build() ) if err := db.First(record).Error; err != nil { return record, err } return record, nil } func (slf *KeyPersonBaseSearch) Find() ([]*KeyPersonBase, int64, error) { var ( records = make([]*KeyPersonBase, 0) total int64 db = slf.build() ) if err := db.Count(&total).Error; err != nil { return records, total, fmt.Errorf("find count err: %v", err) } if slf.PageNum*slf.PageSize > 0 { db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) } if err := db.Find(&records).Error; err != nil { return records, total, fmt.Errorf("find records err: %v", err) } return records, total, nil } func (slf *KeyPersonBaseSearch) FindAll() ([]*KeyPersonBase, error) { var ( records = make([]*KeyPersonBase, 0) db = slf.build() ) if err := db.Find(&records).Error; err != nil { return records, fmt.Errorf("find records err: %v", err) } return records, nil } func (slf *KeyPersonBaseSearch) Count() int64 { var ( count int64 db = slf.build() ) if err := db.Count(&count).Error; err != nil { return count } return count } func (slf *KeyPersonBaseSearch) Create(record *KeyPersonBase) error { var db = slf.build() if err := db.Create(record).Error; err != nil { return fmt.Errorf("create err: %v, record: %+v", err, record) } return nil } func (slf *KeyPersonBaseSearch) Update(record *KeyPersonBase) error { var db = slf.build() if err := db.Updates(record).Error; err != nil { return fmt.Errorf("update err: %v, record: %+v", err, record) } return nil } func (slf *KeyPersonBaseSearch) Delete() error { var db = slf.build() return db.Delete(&KeyPersonBase{}).Error } db/task_results.go
New file @@ -0,0 +1,185 @@ package db import ( "fmt" "gorm.io/gorm" ) type ModelTaskResults struct { BaseModel Title string `json:"name" gorm:"type:varchar(255)"` //预警名称,暂时用任务名称 Event string `json:"event" gorm:"type:varchar(255)"` //预警事件 ModelID string `json:"modelID" gorm:"type:varchar(255)"` //模型ID ModelTaskID string `json:"modelTaskID" gorm:"type:varchar(255)"` //模型任务ID CommunityId string `json:"communityID" gorm:"index;column:community_id;type:varchar(299);not null;"` //小区ID OrgID string `json:"orgID" gorm:"index;column:org_id;type:varchar(299);not null;"` //派出所 domain unit ID ObjectIds string `json:"objectIds" gorm:"type:text"` //事件对象,可以是人,多个用逗号分隔 Location string `json:"location" gorm:"type:varchar(255)"` //发生地点 } func (m *ModelTaskResults) TableName() string { return "model_task_results" } type ModelTaskResultsSearch struct { ModelTaskResults Orm *gorm.DB PageNum int PageSize int Order string Keyword string } func NewModelTaskResultsSearch() *ModelTaskResultsSearch { return &ModelTaskResultsSearch{ Orm: GetDB(), PageNum: 1, PageSize: 10, } } func (slf *ModelTaskResultsSearch) SetOrm(tx *gorm.DB) *ModelTaskResultsSearch { slf.Orm = tx return slf } func (slf *ModelTaskResultsSearch) SetPage(page, size int) *ModelTaskResultsSearch { slf.PageNum, slf.PageSize = page, size return slf } func (slf *ModelTaskResultsSearch) SetOrder(order string) *ModelTaskResultsSearch { slf.Order = order return slf } func (slf *ModelTaskResultsSearch) SetID(id string) *ModelTaskResultsSearch { slf.ID = id return slf } func (slf *ModelTaskResultsSearch) SetKeyword(kw string) *ModelTaskResultsSearch { slf.Keyword = kw return slf } func (slf *ModelTaskResultsSearch) build() *gorm.DB { var db = slf.Orm.Table(slf.TableName()) if slf.Order != "" { db = db.Order(slf.Order) } if slf.ID != "" { db = db.Where("id = ?", slf.ID) } if slf.Keyword != "" { kw := "%" + slf.Keyword + "%" db = db.Where("name like ?", kw) } return db } func (slf *ModelTaskResultsSearch) First() (*ModelTaskResults, error) { var ( record = new(ModelTaskResults) db = slf.build() ) if err := db.First(record).Error; err != nil { return record, err } return record, nil } func (slf *ModelTaskResultsSearch) Find() ([]*ModelTaskResults, int64, error) { var ( records = make([]*ModelTaskResults, 0) total int64 db = slf.build() ) if err := db.Count(&total).Error; err != nil { return records, total, fmt.Errorf("find count err: %v", err) } if slf.PageNum*slf.PageSize > 0 { db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) } if err := db.Find(&records).Error; err != nil { return records, total, fmt.Errorf("find records err: %v", err) } return records, total, nil } func (slf *ModelTaskResultsSearch) FindAll() ([]*ModelTaskResults, error) { var ( records = make([]*ModelTaskResults, 0) db = slf.build() ) if err := db.Find(&records).Error; err != nil { return records, fmt.Errorf("find records err: %v", err) } return records, nil } func (slf *ModelTaskResultsSearch) Count() int64 { var ( count int64 db = slf.build() ) if err := db.Count(&count).Error; err != nil { return count } return count } func (slf *ModelTaskResultsSearch) Create(record *ModelTaskResults) error { var db = slf.build() if err := db.Create(record).Error; err != nil { return fmt.Errorf("create err: %v, record: %+v", err, record) } return nil } func (slf *ModelTaskResultsSearch) BatchCreate(record []*ModelTaskResults) error { var db = slf.build() if err := db.Create(record).Error; err != nil { return fmt.Errorf("create err: %v, record: %+v", err, record) } return nil } func (slf *ModelTaskResultsSearch) Save(record *ModelTaskResults) error { var db = slf.build() if err := db.Omit("CreatedAt").Save(record).Error; err != nil { return fmt.Errorf("save err: %v, record: %+v", err, record) } return nil } func (slf *ModelTaskResultsSearch) Update(record *ModelTaskResults) error { var db = slf.build() if err := db.Updates(record).Error; err != nil { return fmt.Errorf("update err: %v, record: %+v", err, record) } return nil } func (slf *ModelTaskResultsSearch) Delete() error { var db = slf.build() return db.Delete(&ModelTaskResults{}).Error } models/gather_model.go
@@ -9,6 +9,7 @@ "log" "model-engine/config" "model-engine/db" "model-engine/pkg/set" "model-engine/service" "strings" "time" @@ -25,6 +26,7 @@ AppearInterval int `gorm:"type:int;" json:"appearInterval"` //出现间隔,单位为秒 DaysWindow int `gorm:"type:int;" json:"daysWindow" ` //近几天内 Threshold int `gorm:"type:int;" json:"threshold" ` //达几次 Task *db.ModelTask } func (m *GatherModel) Init(task *db.ModelTask) error { @@ -34,6 +36,7 @@ return err } m.Task = task m.OrgIds = orgIds m.AreaIds = areaIds m.Building = task.Building @@ -53,6 +56,7 @@ PicDate string `json:"picDate"` DocumentNumber string CommunityId string `json:"communityId"` OrgId string `json:"orgId"` Building string `json:"building"` Floor string `json:"floor"` GatherPersons int `gorm:"type:int;" json:"gatherPersons"` //聚集人数 @@ -74,11 +78,31 @@ log.Fatalf("Failed to analyze and aggregate data: %v", err) } // Print or process the aggregation results as needed for location, persons := range aggregation { fmt.Printf("Gathering detected at %s with %d unique persons\n", location, len(persons)) if len(aggregation) == 0 { return nil } return nil tagTypes := strings.Split(m.Task.PersonType, ",") results := make([]*db.ModelTaskResults, 0, len(aggregation)) _, typeNames, err := service.GetPersonTypeNameByTypes(tagTypes) if err != nil { return err } event := strings.Join(typeNames, ",") for location, persons := range aggregation { result := &db.ModelTaskResults{ Title: m.Task.Name, Event: fmt.Sprintf("%s/%d人", event, len(persons)), ModelID: m.Task.ModelID, ModelTaskID: m.Task.ID, CommunityId: location.CommunityId, OrgID: location.OrgId, ObjectIds: strings.Join(persons.Elements(), ","), Location: location.Location, } results = append(results, result) } return service.SaveTaskResults(results) } func (m *GatherModel) Shutdown() error { @@ -151,46 +175,54 @@ }, }, "aggs": map[string]interface{}{ "gather_events": map[string]interface{}{ "date_histogram": map[string]interface{}{ "field": "picDate", "interval": fmt.Sprintf("%ds", gatherModel.AppearInterval), "min_doc_count": 1, "orgs": map[string]interface{}{ // 先聚合orgId "terms": map[string]interface{}{ "field": "orgId", // 聚合orgId "size": 10000, }, "aggs": map[string]interface{}{ "community": map[string]interface{}{ "community": map[string]interface{}{ // 在orgId聚合下聚合communityId "terms": map[string]interface{}{ "field": "communityId", // 聚合小区id "field": "communityId", // 聚合communityId "size": 10000, }, "aggs": map[string]interface{}{ "location": map[string]interface{}{ "location": map[string]interface{}{ // 在communityId下聚合building "terms": map[string]interface{}{ "field": "cameraLocation.building", // 聚合楼栋 "size": 10000, }, "aggs": map[string]interface{}{ "floor": map[string]interface{}{ "floor": map[string]interface{}{ // 在building下聚合floor "terms": map[string]interface{}{ "field": "cameraLocation.floor", // 聚合楼层 "size": 10000, }, "aggs": map[string]interface{}{ "people": map[string]interface{}{ "terms": map[string]interface{}{ "field": "documentNumber", // 按人员唯一标识聚合 "size": 10000, "gather_events": map[string]interface{}{ // 在floor下聚合gather_events "date_histogram": map[string]interface{}{ "field": "picDate", "interval": fmt.Sprintf("%ds", gatherModel.AppearInterval), "min_doc_count": 1, }, }, "filter_gather": map[string]interface{}{ "bucket_selector": map[string]interface{}{ "buckets_path": map[string]interface{}{ "personCount": "people._bucket_count", // 统计人数 "aggs": map[string]interface{}{ "people": map[string]interface{}{ "terms": map[string]interface{}{ "field": "documentNumber", // 按人员唯一标识聚合 "size": 10000, }, }, "script": map[string]interface{}{ "source": "params.personCount >= params.gatherPersons", // 聚集人数过滤 "params": map[string]interface{}{ "gatherPersons": gatherModel.GatherPersons, "filter_gather": map[string]interface{}{ "bucket_selector": map[string]interface{}{ "buckets_path": map[string]interface{}{ "personCount": "people._bucket_count", // 统计人数 }, "script": map[string]interface{}{ "source": "params.personCount >= params.gatherPersons", // 聚集人数过滤 "params": map[string]interface{}{ "gatherPersons": gatherModel.GatherPersons, }, }, }, }, }, @@ -237,44 +269,50 @@ // 解析聚合结果 var records []GatherRecord if aggs, ok := result["aggregations"].(map[string]interface{}); ok { if gatherEvents, ok := aggs["gather_events"].(map[string]interface{}); ok { if buckets, ok := gatherEvents["buckets"].([]interface{}); ok { for _, bucket := range buckets { key := int64(bucket.(map[string]interface{})["key"].(float64)) / 1000 // 将毫秒转换为秒 timestamp := time.Unix(key, 0).Format("2006-01-02T15:04:05") if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok { for _, orgBucket := range orgBuckets { orgId := orgBucket.(map[string]interface{})["key"].(string) // 解析按小区、楼栋和楼层的聚合结果 if communityBuckets, ok := bucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok { for _, communityBucket := range communityBuckets { communityId := communityBucket.(map[string]interface{})["key"].(string) // 解析按communityId的聚合结果 if communityBuckets, ok := orgBucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok { for _, communityBucket := range communityBuckets { communityId := communityBucket.(map[string]interface{})["key"].(string) // 解析按楼栋和楼层的聚合结果 if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok { for _, locationBucket := range locationBuckets { building := locationBucket.(map[string]interface{})["key"].(string) // 解析按building的聚合结果 if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok { for _, locationBucket := range locationBuckets { building := locationBucket.(map[string]interface{})["key"].(string) // 解析楼层 if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok { for _, floorBucket := range floorBuckets { floor := floorBucket.(map[string]interface{})["key"].(string) // 解析按floor的聚合结果 if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok { for _, floorBucket := range floorBuckets { floor := floorBucket.(map[string]interface{})["key"].(string) // 解析人员 if peopleBuckets, ok := floorBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok { for _, person := range peopleBuckets { documentNumber := person.(map[string]interface{})["key"].(string) // 解析聚合的事件 if gatherEvents, ok := floorBucket.(map[string]interface{})["gather_events"].(map[string]interface{})["buckets"].([]interface{}); ok { for _, eventBucket := range gatherEvents { key := int64(eventBucket.(map[string]interface{})["key"].(float64)) / 1000 // 将毫秒转换为秒 timestamp := time.Unix(key, 0).Format("2006-01-02T15:04:05") // 构建 GatherRecord 结构体 record := GatherRecord{ PicDate: timestamp, DocumentNumber: documentNumber, CommunityId: communityId, Building: building, Floor: floor, AppearInterval: gatherModel.AppearInterval, GatherPersons: gatherModel.GatherPersons, // 解析人员 if peopleBuckets, ok := eventBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok { for _, person := range peopleBuckets { documentNumber := person.(map[string]interface{})["key"].(string) // 构建 GatherRecord 结构体 record := GatherRecord{ PicDate: timestamp, DocumentNumber: documentNumber, CommunityId: communityId, Building: building, Floor: floor, OrgId: orgId, AppearInterval: gatherModel.AppearInterval, GatherPersons: gatherModel.GatherPersons, } records = append(records, record) } records = append(records, record) } } } @@ -291,15 +329,42 @@ return records, nil } func analyzeAndAggregate(records []GatherRecord) (map[string][]string, error) { // Implement logic to aggregate and analyze data based on GatherModel parameters // This is a placeholder for the actual implementation aggregation := make(map[string][]string) type GatherLocation struct { CommunityId string OrgId string Building string Floor string Location string } // Example logic: func analyzeAndAggregate(records []GatherRecord) (map[GatherLocation]set.StringSet, error) { aggregation := make(map[GatherLocation]set.StringSet) domainIds := set.NewStringSet() for _, record := range records { key := fmt.Sprintf("%s%s%s", record.CommunityId, record.Building, record.Floor) aggregation[key] = append(aggregation[key], record.DocumentNumber) domainIds.Add(record.CommunityId) } domains, err := service.GetUnitsMapByIds(domainIds.Elements()) if err != nil { return nil, err } for _, record := range records { if domains[record.CommunityId] == nil { continue } location := GatherLocation{ CommunityId: record.CommunityId, OrgId: record.OrgId, Building: record.Building, Floor: record.Floor, Location: fmt.Sprintf("%s%s%s", domains[record.CommunityId].Name, record.Building, record.Floor), } if aggregation[location] == nil { aggregation[location] = set.NewStringSet() } aggregation[location].Add(record.DocumentNumber) } return aggregation, nil service/domain.go
@@ -78,3 +78,16 @@ return list, nil } func GetUnitsMapByIds(ids []string) (m map[string]*db.DomainUnit, err error) { list := make([]*db.DomainUnit, 0) err = db.GetDB().Model(&db.DomainUnit{}).Where("id in ?", ids).Find(&list).Error if err != nil { return nil, err } m = make(map[string]*db.DomainUnit, len(list)) for _, v := range list { m[v.ID] = v } return } service/key_person.go
New file @@ -0,0 +1,23 @@ package service import "model-engine/db" func GetPersonTypeNameByTypes(types []string) (m map[string]string, names []string, err error) { m = make(map[string]string) records, err := db.NewKeyPersonBaseSearch().SetTagTypes(types).FindAll() if err != nil { return } for _, record := range records { m[record.TagType] = record.TagName } for _, t := range types { names = append(names, m[t]) } return } service/task_results.go
New file @@ -0,0 +1,7 @@ package service import "model-engine/db" func SaveTaskResults(results []*db.ModelTaskResults) error { return db.NewModelTaskResultsSearch().BatchCreate(results) }