zhangqian
2024-12-12 973c3b1684d0d019aa74113b1983fe29f20d2a49
模型任务执行预警结果存表
4个文件已添加
2个文件已修改
588 ■■■■ 已修改文件
db/key_person_base.go 165 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/task_results.go 185 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
models/gather_model.go 195 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/domain.go 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/key_person.go 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/task_results.go 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/key_person_base.go
New file
@@ -0,0 +1,165 @@
package db
import (
    "fmt"
    "gorm.io/gorm"
)
// KeyPersonBase 重点人员底库
type KeyPersonBase struct {
    BaseModel
    Name    string `json:"name" gorm:"type:varchar(255);"`   //底库名称
    Color   string `json:"color" gorm:"type:varchar(255);"`  //底库颜色
    Level   string `json:"level" gorm:"type:varchar(255);"`  //底库等级
    TagName string `json:"tagName" gorm:"type:varchar(255)"` //标签名称
    TagType string `json:"tagType" gorm:"type:varchar(255)"` //标签类型
}
func (m *KeyPersonBase) TableName() string {
    return "key_person_base"
}
type KeyPersonBaseSearch struct {
    KeyPersonBase
    Orm      *gorm.DB
    PageNum  int
    PageSize int
    Order    string
    TagTypes []string
}
func NewKeyPersonBaseSearch() *KeyPersonBaseSearch {
    return &KeyPersonBaseSearch{
        Orm:      GetDB(),
        PageNum:  1,
        PageSize: 10,
    }
}
func (slf *KeyPersonBaseSearch) SetPage(page, size int) *KeyPersonBaseSearch {
    slf.PageNum, slf.PageSize = page, size
    return slf
}
func (slf *KeyPersonBaseSearch) SetOrder(order string) *KeyPersonBaseSearch {
    slf.Order = order
    return slf
}
func (slf *KeyPersonBaseSearch) SetID(id string) *KeyPersonBaseSearch {
    slf.ID = id
    return slf
}
func (slf *KeyPersonBaseSearch) Set(id string) *KeyPersonBaseSearch {
    slf.ID = id
    return slf
}
func (slf *KeyPersonBaseSearch) SetTagTypes(ids []string) *KeyPersonBaseSearch {
    slf.TagTypes = ids
    return slf
}
func (slf *KeyPersonBaseSearch) build() *gorm.DB {
    var db = slf.Orm.Table(slf.TableName())
    if slf.Order != "" {
        db = db.Order(slf.Order)
    }
    if slf.ID != "" {
        db = db.Where("id = ?", slf.ID)
    }
    if slf.Name != "" {
        db = db.Where("name = ?", slf.Name)
    }
    if len(slf.TagTypes) != 0 {
        db = db.Where("tag_type in ?", slf.TagTypes)
    }
    return db
}
func (slf *KeyPersonBaseSearch) First() (*KeyPersonBase, error) {
    var (
        record = new(KeyPersonBase)
        db     = slf.build()
    )
    if err := db.First(record).Error; err != nil {
        return record, err
    }
    return record, nil
}
func (slf *KeyPersonBaseSearch) Find() ([]*KeyPersonBase, int64, error) {
    var (
        records = make([]*KeyPersonBase, 0)
        total   int64
        db      = slf.build()
    )
    if err := db.Count(&total).Error; err != nil {
        return records, total, fmt.Errorf("find count err: %v", err)
    }
    if slf.PageNum*slf.PageSize > 0 {
        db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize)
    }
    if err := db.Find(&records).Error; err != nil {
        return records, total, fmt.Errorf("find records err: %v", err)
    }
    return records, total, nil
}
func (slf *KeyPersonBaseSearch) FindAll() ([]*KeyPersonBase, error) {
    var (
        records = make([]*KeyPersonBase, 0)
        db      = slf.build()
    )
    if err := db.Find(&records).Error; err != nil {
        return records, fmt.Errorf("find records err: %v", err)
    }
    return records, nil
}
func (slf *KeyPersonBaseSearch) Count() int64 {
    var (
        count int64
        db    = slf.build()
    )
    if err := db.Count(&count).Error; err != nil {
        return count
    }
    return count
}
func (slf *KeyPersonBaseSearch) Create(record *KeyPersonBase) error {
    var db = slf.build()
    if err := db.Create(record).Error; err != nil {
        return fmt.Errorf("create err: %v, record: %+v", err, record)
    }
    return nil
}
func (slf *KeyPersonBaseSearch) Update(record *KeyPersonBase) error {
    var db = slf.build()
    if err := db.Updates(record).Error; err != nil {
        return fmt.Errorf("update err: %v, record: %+v", err, record)
    }
    return nil
}
func (slf *KeyPersonBaseSearch) Delete() error {
    var db = slf.build()
    return db.Delete(&KeyPersonBase{}).Error
}
db/task_results.go
New file
@@ -0,0 +1,185 @@
package db
import (
    "fmt"
    "gorm.io/gorm"
)
type ModelTaskResults struct {
    BaseModel
    Title       string `json:"name" gorm:"type:varchar(255)"`                                            //预警名称,暂时用任务名称
    Event       string `json:"event" gorm:"type:varchar(255)"`                                           //预警事件
    ModelID     string `json:"modelID" gorm:"type:varchar(255)"`                                         //模型ID
    ModelTaskID string `json:"modelTaskID" gorm:"type:varchar(255)"`                                     //模型任务ID
    CommunityId string `json:"communityID" gorm:"index;column:community_id;type:varchar(299);not null;"` //小区ID
    OrgID       string `json:"orgID" gorm:"index;column:org_id;type:varchar(299);not null;"`             //派出所 domain unit ID
    ObjectIds   string `json:"objectIds" gorm:"type:text"`                                               //事件对象,可以是人,多个用逗号分隔
    Location    string `json:"location" gorm:"type:varchar(255)"`                                        //发生地点
}
func (m *ModelTaskResults) TableName() string {
    return "model_task_results"
}
type ModelTaskResultsSearch struct {
    ModelTaskResults
    Orm      *gorm.DB
    PageNum  int
    PageSize int
    Order    string
    Keyword  string
}
func NewModelTaskResultsSearch() *ModelTaskResultsSearch {
    return &ModelTaskResultsSearch{
        Orm:      GetDB(),
        PageNum:  1,
        PageSize: 10,
    }
}
func (slf *ModelTaskResultsSearch) SetOrm(tx *gorm.DB) *ModelTaskResultsSearch {
    slf.Orm = tx
    return slf
}
func (slf *ModelTaskResultsSearch) SetPage(page, size int) *ModelTaskResultsSearch {
    slf.PageNum, slf.PageSize = page, size
    return slf
}
func (slf *ModelTaskResultsSearch) SetOrder(order string) *ModelTaskResultsSearch {
    slf.Order = order
    return slf
}
func (slf *ModelTaskResultsSearch) SetID(id string) *ModelTaskResultsSearch {
    slf.ID = id
    return slf
}
func (slf *ModelTaskResultsSearch) SetKeyword(kw string) *ModelTaskResultsSearch {
    slf.Keyword = kw
    return slf
}
func (slf *ModelTaskResultsSearch) build() *gorm.DB {
    var db = slf.Orm.Table(slf.TableName())
    if slf.Order != "" {
        db = db.Order(slf.Order)
    }
    if slf.ID != "" {
        db = db.Where("id = ?", slf.ID)
    }
    if slf.Keyword != "" {
        kw := "%" + slf.Keyword + "%"
        db = db.Where("name like ?", kw)
    }
    return db
}
func (slf *ModelTaskResultsSearch) First() (*ModelTaskResults, error) {
    var (
        record = new(ModelTaskResults)
        db     = slf.build()
    )
    if err := db.First(record).Error; err != nil {
        return record, err
    }
    return record, nil
}
func (slf *ModelTaskResultsSearch) Find() ([]*ModelTaskResults, int64, error) {
    var (
        records = make([]*ModelTaskResults, 0)
        total   int64
        db      = slf.build()
    )
    if err := db.Count(&total).Error; err != nil {
        return records, total, fmt.Errorf("find count err: %v", err)
    }
    if slf.PageNum*slf.PageSize > 0 {
        db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize)
    }
    if err := db.Find(&records).Error; err != nil {
        return records, total, fmt.Errorf("find records err: %v", err)
    }
    return records, total, nil
}
func (slf *ModelTaskResultsSearch) FindAll() ([]*ModelTaskResults, error) {
    var (
        records = make([]*ModelTaskResults, 0)
        db      = slf.build()
    )
    if err := db.Find(&records).Error; err != nil {
        return records, fmt.Errorf("find records err: %v", err)
    }
    return records, nil
}
func (slf *ModelTaskResultsSearch) Count() int64 {
    var (
        count int64
        db    = slf.build()
    )
    if err := db.Count(&count).Error; err != nil {
        return count
    }
    return count
}
func (slf *ModelTaskResultsSearch) Create(record *ModelTaskResults) error {
    var db = slf.build()
    if err := db.Create(record).Error; err != nil {
        return fmt.Errorf("create err: %v, record: %+v", err, record)
    }
    return nil
}
func (slf *ModelTaskResultsSearch) BatchCreate(record []*ModelTaskResults) error {
    var db = slf.build()
    if err := db.Create(record).Error; err != nil {
        return fmt.Errorf("create err: %v, record: %+v", err, record)
    }
    return nil
}
func (slf *ModelTaskResultsSearch) Save(record *ModelTaskResults) error {
    var db = slf.build()
    if err := db.Omit("CreatedAt").Save(record).Error; err != nil {
        return fmt.Errorf("save err: %v, record: %+v", err, record)
    }
    return nil
}
func (slf *ModelTaskResultsSearch) Update(record *ModelTaskResults) error {
    var db = slf.build()
    if err := db.Updates(record).Error; err != nil {
        return fmt.Errorf("update err: %v, record: %+v", err, record)
    }
    return nil
}
func (slf *ModelTaskResultsSearch) Delete() error {
    var db = slf.build()
    return db.Delete(&ModelTaskResults{}).Error
}
models/gather_model.go
@@ -9,6 +9,7 @@
    "log"
    "model-engine/config"
    "model-engine/db"
    "model-engine/pkg/set"
    "model-engine/service"
    "strings"
    "time"
@@ -25,6 +26,7 @@
    AppearInterval int           `gorm:"type:int;" json:"appearInterval"`      //出现间隔,单位为秒
    DaysWindow     int           `gorm:"type:int;" json:"daysWindow" `         //近几天内
    Threshold      int           `gorm:"type:int;" json:"threshold" `          //达几次
    Task           *db.ModelTask
}
func (m *GatherModel) Init(task *db.ModelTask) error {
@@ -34,6 +36,7 @@
        return err
    }
    m.Task = task
    m.OrgIds = orgIds
    m.AreaIds = areaIds
    m.Building = task.Building
@@ -53,6 +56,7 @@
    PicDate        string `json:"picDate"`
    DocumentNumber string
    CommunityId    string `json:"communityId"`
    OrgId          string `json:"orgId"`
    Building       string `json:"building"`
    Floor          string `json:"floor"`
    GatherPersons  int    `gorm:"type:int;" json:"gatherPersons"`  //聚集人数
@@ -74,11 +78,31 @@
        log.Fatalf("Failed to analyze and aggregate data: %v", err)
    }
    // Print or process the aggregation results as needed
    for location, persons := range aggregation {
        fmt.Printf("Gathering detected at %s with %d unique persons\n", location, len(persons))
    if len(aggregation) == 0 {
        return nil
    }
    return nil
    tagTypes := strings.Split(m.Task.PersonType, ",")
    results := make([]*db.ModelTaskResults, 0, len(aggregation))
    _, typeNames, err := service.GetPersonTypeNameByTypes(tagTypes)
    if err != nil {
        return err
    }
    event := strings.Join(typeNames, ",")
    for location, persons := range aggregation {
        result := &db.ModelTaskResults{
            Title:       m.Task.Name,
            Event:       fmt.Sprintf("%s/%d人", event, len(persons)),
            ModelID:     m.Task.ModelID,
            ModelTaskID: m.Task.ID,
            CommunityId: location.CommunityId,
            OrgID:       location.OrgId,
            ObjectIds:   strings.Join(persons.Elements(), ","),
            Location:    location.Location,
        }
        results = append(results, result)
    }
    return service.SaveTaskResults(results)
}
func (m *GatherModel) Shutdown() error {
@@ -151,46 +175,54 @@
            },
        },
        "aggs": map[string]interface{}{
            "gather_events": map[string]interface{}{
                "date_histogram": map[string]interface{}{
                    "field":         "picDate",
                    "interval":      fmt.Sprintf("%ds", gatherModel.AppearInterval),
                    "min_doc_count": 1,
            "orgs": map[string]interface{}{ // 先聚合orgId
                "terms": map[string]interface{}{
                    "field": "orgId", // 聚合orgId
                    "size":  10000,
                },
                "aggs": map[string]interface{}{
                    "community": map[string]interface{}{
                    "community": map[string]interface{}{ // 在orgId聚合下聚合communityId
                        "terms": map[string]interface{}{
                            "field": "communityId", // 聚合小区id
                            "field": "communityId", // 聚合communityId
                            "size":  10000,
                        },
                        "aggs": map[string]interface{}{
                            "location": map[string]interface{}{
                            "location": map[string]interface{}{ // 在communityId下聚合building
                                "terms": map[string]interface{}{
                                    "field": "cameraLocation.building", // 聚合楼栋
                                    "size":  10000,
                                },
                                "aggs": map[string]interface{}{
                                    "floor": map[string]interface{}{
                                    "floor": map[string]interface{}{ // 在building下聚合floor
                                        "terms": map[string]interface{}{
                                            "field": "cameraLocation.floor", // 聚合楼层
                                            "size":  10000,
                                        },
                                        "aggs": map[string]interface{}{
                                            "people": map[string]interface{}{
                                                "terms": map[string]interface{}{
                                                    "field": "documentNumber", // 按人员唯一标识聚合
                                                    "size":  10000,
                                            "gather_events": map[string]interface{}{ // 在floor下聚合gather_events
                                                "date_histogram": map[string]interface{}{
                                                    "field":         "picDate",
                                                    "interval":      fmt.Sprintf("%ds", gatherModel.AppearInterval),
                                                    "min_doc_count": 1,
                                                },
                                            },
                                            "filter_gather": map[string]interface{}{
                                                "bucket_selector": map[string]interface{}{
                                                    "buckets_path": map[string]interface{}{
                                                        "personCount": "people._bucket_count", // 统计人数
                                                "aggs": map[string]interface{}{
                                                    "people": map[string]interface{}{
                                                        "terms": map[string]interface{}{
                                                            "field": "documentNumber", // 按人员唯一标识聚合
                                                            "size":  10000,
                                                        },
                                                    },
                                                    "script": map[string]interface{}{
                                                        "source": "params.personCount >= params.gatherPersons", // 聚集人数过滤
                                                        "params": map[string]interface{}{
                                                            "gatherPersons": gatherModel.GatherPersons,
                                                    "filter_gather": map[string]interface{}{
                                                        "bucket_selector": map[string]interface{}{
                                                            "buckets_path": map[string]interface{}{
                                                                "personCount": "people._bucket_count", // 统计人数
                                                            },
                                                            "script": map[string]interface{}{
                                                                "source": "params.personCount >= params.gatherPersons", // 聚集人数过滤
                                                                "params": map[string]interface{}{
                                                                    "gatherPersons": gatherModel.GatherPersons,
                                                                },
                                                            },
                                                        },
                                                    },
                                                },
@@ -237,44 +269,50 @@
    // 解析聚合结果
    var records []GatherRecord
    if aggs, ok := result["aggregations"].(map[string]interface{}); ok {
        if gatherEvents, ok := aggs["gather_events"].(map[string]interface{}); ok {
            if buckets, ok := gatherEvents["buckets"].([]interface{}); ok {
                for _, bucket := range buckets {
                    key := int64(bucket.(map[string]interface{})["key"].(float64)) / 1000 // 将毫秒转换为秒
                    timestamp := time.Unix(key, 0).Format("2006-01-02T15:04:05")
        if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok {
            for _, orgBucket := range orgBuckets {
                orgId := orgBucket.(map[string]interface{})["key"].(string)
                    // 解析按小区、楼栋和楼层的聚合结果
                    if communityBuckets, ok := bucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok {
                        for _, communityBucket := range communityBuckets {
                            communityId := communityBucket.(map[string]interface{})["key"].(string)
                // 解析按communityId的聚合结果
                if communityBuckets, ok := orgBucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok {
                    for _, communityBucket := range communityBuckets {
                        communityId := communityBucket.(map[string]interface{})["key"].(string)
                            // 解析按楼栋和楼层的聚合结果
                            if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok {
                                for _, locationBucket := range locationBuckets {
                                    building := locationBucket.(map[string]interface{})["key"].(string)
                        // 解析按building的聚合结果
                        if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok {
                            for _, locationBucket := range locationBuckets {
                                building := locationBucket.(map[string]interface{})["key"].(string)
                                    // 解析楼层
                                    if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok {
                                        for _, floorBucket := range floorBuckets {
                                            floor := floorBucket.(map[string]interface{})["key"].(string)
                                // 解析按floor的聚合结果
                                if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok {
                                    for _, floorBucket := range floorBuckets {
                                        floor := floorBucket.(map[string]interface{})["key"].(string)
                                            // 解析人员
                                            if peopleBuckets, ok := floorBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok {
                                                for _, person := range peopleBuckets {
                                                    documentNumber := person.(map[string]interface{})["key"].(string)
                                        // 解析聚合的事件
                                        if gatherEvents, ok := floorBucket.(map[string]interface{})["gather_events"].(map[string]interface{})["buckets"].([]interface{}); ok {
                                            for _, eventBucket := range gatherEvents {
                                                key := int64(eventBucket.(map[string]interface{})["key"].(float64)) / 1000 // 将毫秒转换为秒
                                                timestamp := time.Unix(key, 0).Format("2006-01-02T15:04:05")
                                                    // 构建 GatherRecord 结构体
                                                    record := GatherRecord{
                                                        PicDate:        timestamp,
                                                        DocumentNumber: documentNumber,
                                                        CommunityId:    communityId,
                                                        Building:       building,
                                                        Floor:          floor,
                                                        AppearInterval: gatherModel.AppearInterval,
                                                        GatherPersons:  gatherModel.GatherPersons,
                                                // 解析人员
                                                if peopleBuckets, ok := eventBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok {
                                                    for _, person := range peopleBuckets {
                                                        documentNumber := person.(map[string]interface{})["key"].(string)
                                                        // 构建 GatherRecord 结构体
                                                        record := GatherRecord{
                                                            PicDate:        timestamp,
                                                            DocumentNumber: documentNumber,
                                                            CommunityId:    communityId,
                                                            Building:       building,
                                                            Floor:          floor,
                                                            OrgId:          orgId,
                                                            AppearInterval: gatherModel.AppearInterval,
                                                            GatherPersons:  gatherModel.GatherPersons,
                                                        }
                                                        records = append(records, record)
                                                    }
                                                    records = append(records, record)
                                                }
                                            }
                                        }
@@ -291,15 +329,42 @@
    return records, nil
}
func analyzeAndAggregate(records []GatherRecord) (map[string][]string, error) {
    // Implement logic to aggregate and analyze data based on GatherModel parameters
    // This is a placeholder for the actual implementation
    aggregation := make(map[string][]string)
type GatherLocation struct {
    CommunityId string
    OrgId       string
    Building    string
    Floor       string
    Location    string
}
    // Example logic:
func analyzeAndAggregate(records []GatherRecord) (map[GatherLocation]set.StringSet, error) {
    aggregation := make(map[GatherLocation]set.StringSet)
    domainIds := set.NewStringSet()
    for _, record := range records {
        key := fmt.Sprintf("%s%s%s", record.CommunityId, record.Building, record.Floor)
        aggregation[key] = append(aggregation[key], record.DocumentNumber)
        domainIds.Add(record.CommunityId)
    }
    domains, err := service.GetUnitsMapByIds(domainIds.Elements())
    if err != nil {
        return nil, err
    }
    for _, record := range records {
        if domains[record.CommunityId] == nil {
            continue
        }
        location := GatherLocation{
            CommunityId: record.CommunityId,
            OrgId:       record.OrgId,
            Building:    record.Building,
            Floor:       record.Floor,
            Location:    fmt.Sprintf("%s%s%s", domains[record.CommunityId].Name, record.Building, record.Floor),
        }
        if aggregation[location] == nil {
            aggregation[location] = set.NewStringSet()
        }
        aggregation[location].Add(record.DocumentNumber)
    }
    return aggregation, nil
service/domain.go
@@ -78,3 +78,16 @@
    return list, nil
}
func GetUnitsMapByIds(ids []string) (m map[string]*db.DomainUnit, err error) {
    list := make([]*db.DomainUnit, 0)
    err = db.GetDB().Model(&db.DomainUnit{}).Where("id in ?", ids).Find(&list).Error
    if err != nil {
        return nil, err
    }
    m = make(map[string]*db.DomainUnit, len(list))
    for _, v := range list {
        m[v.ID] = v
    }
    return
}
service/key_person.go
New file
@@ -0,0 +1,23 @@
package service
import "model-engine/db"
func GetPersonTypeNameByTypes(types []string) (m map[string]string, names []string, err error) {
    m = make(map[string]string)
    records, err := db.NewKeyPersonBaseSearch().SetTagTypes(types).FindAll()
    if err != nil {
        return
    }
    for _, record := range records {
        m[record.TagType] = record.TagName
    }
    for _, t := range types {
        names = append(names, m[t])
    }
    return
}
service/task_results.go
New file
@@ -0,0 +1,7 @@
package service
import "model-engine/db"
func SaveTaskResults(results []*db.ModelTaskResults) error {
    return db.NewModelTaskResultsSearch().BatchCreate(results)
}