zhaoqingang
2025-02-08 cf511f44e700eff9bd5d17e387fb7a6da5b2c303
增加昼伏夜出模型
1个文件已添加
4个文件已修改
526 ■■■■■ 已修改文件
db/db.go 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/model.go 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
models/locationAnalysis.go 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
models/model.go 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
models/nightAnalysis.go 511 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/db.go
@@ -83,6 +83,15 @@
            Version:     "v1.0.0",
            Enabled:     false,
        },
        //{
        //    BaseModel: BaseModel{
        //        ID: ModelIdNightAnalysis,
        //    },
        //    Name:        "昼伏夜出分析",
        //    Description: "昼伏夜出分析",
        //    Version:     "v1.0.0",
        //    Enabled:     false,
        //},
    }
    for i := range models {
db/model.go
@@ -187,4 +187,5 @@
    ModelIdGather           = "gather"           // 聚集
    ModelIdDisappear        = "disappear"        // 失踪
    ModelIdLocationAnalysis = "locationAnalysis" // 场所分析
    //ModelIdNightAnalysis    = "nightAnalysis"    // 昼伏夜出分析
)
models/locationAnalysis.go
@@ -59,6 +59,10 @@
    if task.IdentityType != "" {
        m.PersonIdentity = strings.Split(task.IdentityType, ",")
    }
    if len(m.AreaIds) == 0 {
        m.AreaIds = m.OrgIds
    }
    for _, v := range task.Rules {
        if v.Alias == "appearances" {
            if val, ok := v.Value.(float64); ok {
models/model.go
@@ -17,6 +17,7 @@
    "gather":    func() Model { return &GatherModel{} },
    "disappear": func() Model { return &DisappearModel{} },
    "location":  func() Model { return &LocationModel{} },
    "night":     func() Model { return &nightModel{} },
    // 添加其他模型
}
models/nightAnalysis.go
New file
@@ -0,0 +1,511 @@
package models
import (
    "bytes"
    "context"
    "encoding/json"
    "errors"
    "fmt"
    "github.com/elastic/go-elasticsearch/v6"
    //"log"
    "model-engine/config"
    "model-engine/pkg/set"
    "strconv"
    "strings"
    "time"
    "model-engine/db"
    "model-engine/pkg/logger"
    "model-engine/service"
)
type nightModel struct {
    AreaIds        []interface{} `json:"-"`
    OrgIds         []interface{} `json:"-"`
    Building       string        // 楼栋
    Floor          string
    AlarmType      db.AlarmType // 预警方式
    KeyPersonType  string       // 人员类型
    PersonLabel    string       // 人员标签
    PersonIdentity []string     // 人员身份
    Duration       int          // 时间范围
    Appearances    int          // 出现次数,
    StartTime      int          // 时间范围, 开始时间
    EndTime        int          // 时间范围, 结束时间
    Task           *db.ModelTask
}
func (m *nightModel) Init(task *db.ModelTask) error {
    //m.AreaIds = make(map[string]struct{})
    //for _, a := range task.DomainUnitIds {
    //    m.AreaIds[a] = struct{}{}
    //}
    if len(task.DomainUnitIds) == 0 {
        return errors.New("empty domain set")
    }
    orgIds, areaIds, err := service.GetOrgIdsAndAreaIdsByDomainUnitIds(task.DomainUnitIds)
    if err != nil {
        return err
    }
    m.Task = task
    m.OrgIds = orgIds
    m.AreaIds = areaIds
    m.Building = task.Building
    m.AlarmType = task.AlarmType
    m.PersonIdentity = []string{"stranger", "visitor", "resident"} //task.IdentityType
    m.KeyPersonType = task.PersonType
    m.PersonLabel = task.PersonLabel
    if task.IdentityType != "" {
        m.PersonIdentity = strings.Split(task.IdentityType, ",")
    }
    for _, v := range task.Rules {
        if v.Alias == "appearances" {
            if val, ok := v.Value.(float64); ok {
                m.Appearances = int(val)
            }
        }
        if v.Alias == "duration" {
            if val, ok := v.Value.(float64); ok {
                m.Duration = int(val)
            }
        }
        if v.Alias == "timeRange" {
            if val, ok := v.Value.(string); ok {
                ages := strings.Split(val, ",")
                m.StartTime, _ = strconv.Atoi(ages[0])
                m.EndTime, _ = strconv.Atoi(ages[1])
            }
        }
    }
    // 默认计算30天的数据
    if m.Duration == 0 {
        m.Duration = 30
    }
    logger.Debugf("LocationModel init finish ...task id:%s, name:%s, rule:%+v", task.ID, task.Name, m)
    return nil
}
type nightRecord struct {
    IDCard          string `json:"idCard"`
    PicDate         string `json:"picDate"`
    DocumentNumbers []string
    CommunityId     string `json:"communityId"`
    OrgId           string `json:"orgId"`
    Building        string `json:"building"`
    Floor           string `json:"floor"`
    AppearCount     int    `gorm:"type:int;" json:"appearCount"` // 出现次数
    //AppearInterval int    `gorm:"type:int;" json:"appearInterval"` // 出现间隔,单位为秒
}
type nightPersonInfo struct {
    Id             string `json:"id"`
    DocumentNumber string `json:"document_number"`
    PersonType     string `json:"person_type"`
    //CommunityId        string `json:"community_id"`
    //OrgId              string `json:"org_id"`
    //PersonName         string `json:"person_name"`
    //IdCard             string `json:"id_card"`
    LastAppearanceTime int64 `json:"last_appearance_time"`
    //LastDirection      string `json:"last_direction"`
    //LastLocation       string `json:"last_location"`
}
//var (
//    processed        sync.Map                           // 存储已处理记录
//    cleanupThreshold = time.Now().Add(-100 * time.Hour) // 定义一个时间窗口,假设只保存最近100小时的记录
//)
func (m *nightModel) Run() error {
    // 根据配置的时间段天数, 每天的时间范围内, 重点人员类型或者特定标签人员出现的楼层次数超过阈值
    results := make([]*db.ModelTaskResults, 0)
    var baseFilter, labelFilter, keyFilter, lastFilter []LocationPersonInfo
    var document_number_map map[string]LocationPersonInfo
    var document_number_list []string
    err := db.GetDB().Raw(`
        SELECT
            s.document_number,
--             s.community_id,
--             s.org_id,
--             p.person_name,
--             p.id_card,
            s.last_appearance_time,
--             s.last_direction,
--             s.last_location
        FROM
            snapshot_count_summary AS s
            JOIN person AS p ON p.id = s.document_number
        WHERE
            p.id_card != ""
            AND (p.community_id IN ?
            OR p.org_id IN ?)
            AND p.status IN ?
        `, m.AreaIds, m.OrgIds, m.PersonIdentity).Scan(&baseFilter).Error
    if err != nil {
        logger.Warnf(err.Error())
    }
    if len(baseFilter) == 0 {
        return fmt.Errorf("no results found that match the age condition %s - %s ", m.AreaIds, m.OrgIds)
    }
    logger.Debugf("task %s match age result %d", m.Task.Name, len(baseFilter))
    for _, i := range baseFilter {
        if _, ok := document_number_map[i.DocumentNumber]; !ok {
            document_number_list = append(document_number_list, i.DocumentNumber)
        }
        document_number_map[i.DocumentNumber] = i
    }
    if m.PersonLabel != "" {
        labels := strings.Split(m.PersonLabel, ",")
        err := db.GetDB().Raw(`
        SELECT
            p.id
        FROM
            person AS p
            JOIN person_label AS l ON p.id = l.person_id
        WHERE
            p.id IN ?
            AND    l.label_id IN ?
        `, document_number_list, labels).Scan(&labelFilter).Error
        if err != nil {
            logger.Warnf(err.Error())
        }
        if len(labelFilter) == 0 {
            return fmt.Errorf("no results found that match the label condition %s", m.PersonLabel)
        }
        logger.Debugf("task %s match label result %d", m.Task.Name, len(labelFilter))
    }
    document_number_list = []string{}
    for _, i := range labelFilter {
        document_number_list = append(document_number_list, i.Id)
    }
    if m.KeyPersonType != "" {
        keyTypes := strings.Split(m.KeyPersonType, ",")
        err := db.GetDB().Raw(`
        SELECT
            p.id,
            k.person_type
        FROM
            person AS p
            JOIN key_person AS k ON k.id_card = p.id_card
        WHERE
            p.id IN ?
            AND k.person_type IN ?
        `, m.StartTime, keyTypes).Scan(&keyFilter).Error
        if err != nil {
            logger.Warnf(err.Error())
        }
        if len(keyFilter) == 0 {
            return fmt.Errorf("no results found that match the key condition %s", m.KeyPersonType)
        }
        logger.Debugf("task %s match key person result %d", m.Task.Name, len(keyFilter))
    }
    logger.Debugf("task %s last result %d", m.Task.Name, len(lastFilter))
    document_number_list = []string{}
    for _, i := range keyFilter {
        document_number_list = append(document_number_list, i.Id)
        person := document_number_map[i.DocumentNumber]
        person.PersonType = i.PersonType
        document_number_map[i.DocumentNumber] = person
    }
    records, err := queryEsLocation(db.GetEsClient(), m, document_number_list)
    if err != nil {
        return err
    }
    domains, err := domainToLocation(records)
    if err != nil {
        return err
    }
    var tagTypes []string
    var lastAppearanceTime int64
    for _, record := range records {
        tagTypes = []string{}
        for _, personId := range record.DocumentNumbers {
            tagTypes = append(tagTypes, document_number_map[personId].PersonType)
            lastAppearanceTime = document_number_map[personId].LastAppearanceTime
        }
        _, typeNames, err := service.GetPersonTypeNameByTypes(tagTypes)
        if err != nil {
            return err
        }
        event := strings.Join(typeNames, ",")
        result := &db.ModelTaskResults{
            Title:         m.Task.Name,
            Event:         m.eventFormat(event, record.AppearCount),
            ModelID:       m.Task.ModelID,
            ModelTaskID:   m.Task.ID,
            CommunityId:   record.CommunityId,
            OrgID:         record.OrgId,
            ObjectIds:     strings.Join(record.DocumentNumbers, ","),
            Location:      fmt.Sprintf("%s%s%s", domains[record.CommunityId].Name, record.Building, record.Floor),
            PicDate:       time.Unix(lastAppearanceTime, 0).Format("2006-01-02 15:04:05"),
            FirstPersonID: record.DocumentNumbers[0],
        }
        results = append(results, result)
    }
    logger.Debugf("task %s last filter result %d", m.Task.Name, len(results))
    return service.SaveTaskResults(results)
}
func (m *nightModel) KeepAlive() error {
    db.GetDB().Model(m.Task).Where("id = ?", m.Task.ID).Update("last_run_time", time.Now())
    return nil
}
func (m *nightModel) Shutdown() error {
    // 清理资源
    fmt.Println("Shutting down LocationModel Model")
    return nil
}
func (m *nightModel) eventFormat(event string, AppearCount int) string {
    return fmt.Sprintf("%s人员进出%d次", event, AppearCount)
}
func queryEsNight(esClient *elasticsearch.Client, locationModel *LocationModel, documentNumbers []string) ([]*LocationRecord, error) {
    var buf bytes.Buffer
    nowTime := time.Now()
    startTime := nowTime.Add(-time.Duration(locationModel.Duration) * 24 * time.Hour)
    // 构建过滤条件
    var filters []map[string]interface{}
    documentNumberFilter := map[string]interface{}{
        "terms": map[string]interface{}{
            "documentNumber": documentNumbers,
        },
    }
    filters = append(filters, documentNumberFilter)
    if len(locationModel.OrgIds) > 0 || len(locationModel.AreaIds) > 0 {
        // 获取数据权限过滤条件
        authFilters := GetDomainFilters(locationModel.OrgIds, locationModel.AreaIds)
        filters = append(filters, authFilters...)
    }
    // 地址过滤
    if locationModel.Building != "" || locationModel.Floor != "" {
        var addrParams map[string]interface{}
        if locationModel.Floor != "" {
            addrParams = map[string]interface{}{"bool": map[string]interface{}{
                "must": []interface{}{
                    map[string]interface{}{
                        "term": map[string]interface{}{
                            "cameraLocation.building": locationModel.Building,
                        }},
                    map[string]interface{}{
                        "term": map[string]interface{}{
                            "cameraLocation.floor": locationModel.Floor,
                        }},
                },
            }}
        } else if locationModel.Building != "" {
            addrParams = map[string]interface{}{
                "term": map[string]interface{}{
                    "cameraLocation.building": locationModel.Building,
                }}
        }
        filters = append(filters, addrParams)
    }
    //// 重点人员过滤
    //if len(locationModel.KeyPersonType) > 0 {
    //    filters = append(filters, map[string]interface{}{
    //        "terms": map[string]interface{}{
    //            "keyPersonType": strings.Split(locationModel.KeyPersonType, ","),
    //        },
    //    })
    //}
    // 时间范围
    //filters = append(filters, map[string]interface{}{
    //    "range": map[string]interface{}{
    //        "picDate": map[string]interface{}{
    //            "gte": start.Format(time.DateTime),
    //            "lt":  now.Format(time.DateTime),
    //        },
    //    },
    //})
    for date := startTime; date.Before(nowTime); date = date.Add(24 * time.Hour) {
        start := time.Date(date.Year(), date.Month(), date.Day(), locationModel.StartTime, 0, 0, 0, date.Location())
        end := time.Date(date.Year(), date.Month(), date.Day(), locationModel.EndTime, 0, 0, 0, date.Location())
        filters = append(filters, map[string]interface{}{
            "range": map[string]interface{}{
                "picDate": map[string]interface{}{
                    "gte": start.Format(time.RFC3339),
                    "lte": end.Format(time.RFC3339),
                },
            },
        })
    }
    query := map[string]interface{}{
        "query": map[string]interface{}{
            "bool": map[string]interface{}{
                "filter": filters,
            },
        },
        "aggs": map[string]interface{}{
            "orgs": map[string]interface{}{ // 先聚合orgId
                "terms": map[string]interface{}{
                    "field": "orgId", // 聚合orgId
                    "size":  10000,
                },
                "aggs": map[string]interface{}{
                    "community": map[string]interface{}{ // 在orgId聚合下聚合communityId
                        "terms": map[string]interface{}{
                            "field": "communityId", // 聚合communityId
                            "size":  10000,
                        },
                        "aggs": map[string]interface{}{
                            "location": map[string]interface{}{ // 在communityId下聚合building
                                "terms": map[string]interface{}{
                                    "field": "cameraLocation.building", // 聚合楼栋
                                    "size":  10000,
                                },
                                "aggs": map[string]interface{}{
                                    "floor": map[string]interface{}{ // 在building下聚合floor
                                        "terms": map[string]interface{}{
                                            "field": "cameraLocation.floor", // 聚合楼层
                                            "size":  10000,
                                        },
                                        "aggs": map[string]interface{}{
                                            "filter_floor": map[string]interface{}{
                                                "bucket_selector": map[string]interface{}{
                                                    "buckets_path": map[string]interface{}{
                                                        "eventCount": "_count",
                                                    },
                                                    "script": map[string]interface{}{
                                                        "source": "params.eventCount >= params.threshold",
                                                        "params": map[string]interface{}{
                                                            "threshold": locationModel.Appearances,
                                                        },
                                                    },
                                                },
                                            },
                                            "document_numbers": map[string]interface{}{ // 新增按 documentNumber 聚合
                                                "terms": map[string]interface{}{
                                                    "field": "documentNumber",
                                                    "size":  10000,
                                                },
                                            },
                                        },
                                    },
                                },
                            },
                        },
                    },
                },
            },
        },
        "size": 0,
    }
    if err := json.NewEncoder(&buf).Encode(query); err != nil {
        return nil, fmt.Errorf("error encoding query: %s", err)
    }
    res, err := esClient.Search(
        esClient.Search.WithContext(context.Background()),
        esClient.Search.WithIndex(config.EsInfo.EsIndex.AiOcean.IndexName),
        esClient.Search.WithDocumentType(config.EsInfo.EsIndex.AiOcean.IndexType),
        esClient.Search.WithBody(&buf),
        esClient.Search.WithTrackTotalHits(true),
        esClient.Search.WithPretty(),
    )
    if err != nil {
        return nil, fmt.Errorf("error getting response: %s", err)
    }
    defer res.Body.Close()
    // Check for a successful status code (2xx range)
    if res.IsError() {
        return nil, fmt.Errorf("error getting response: %s", res.String())
    }
    var result map[string]interface{}
    if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
        return nil, fmt.Errorf("error parsing response body: %s", err)
    }
    // 解析聚合结果
    var records []*LocationRecord
    if aggs, ok := result["aggregations"].(map[string]interface{}); ok {
        if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok {
            for _, orgBucket := range orgBuckets {
                orgId := orgBucket.(map[string]interface{})["key"].(string)
                // 解析按communityId的聚合结果
                if communityBuckets, ok := orgBucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok {
                    for _, communityBucket := range communityBuckets {
                        communityId := communityBucket.(map[string]interface{})["key"].(string)
                        // 解析按building的聚合结果
                        if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok {
                            for _, locationBucket := range locationBuckets {
                                building := locationBucket.(map[string]interface{})["key"].(string)
                                // 解析按floor的聚合结果
                                if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok {
                                    for _, floorBucket := range floorBuckets {
                                        floor := floorBucket.(map[string]interface{})["key"].(string)
                                        appearCount := floorBucket.(map[string]interface{})["filter_floor"].(int)
                                        // 构建 LocationRecord 结构体
                                        var persons []string
                                        if docNumBuckets, ok := floorBucket.(map[string]interface{})["document_numbers"].(map[string]interface{})["buckets"].([]interface{}); ok {
                                            for _, docNumBucket := range docNumBuckets {
                                                persons = append(persons, docNumBucket.(map[string]interface{})["key"].(string))
                                            }
                                        }
                                        record := &LocationRecord{
                                            //PicDate:        timestamp,
                                            DocumentNumbers: persons,
                                            CommunityId:     communityId,
                                            Building:        building,
                                            Floor:           floor,
                                            OrgId:           orgId,
                                            AppearCount:     appearCount,
                                        }
                                        records = append(records, record)
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    }
    return records, nil
}
//func domainToLocation(records []*LocationRecord) (map[string]*db.DomainUnit, error) {
//    if len(records) == 0 {
//        return nil, nil
//    }
//    domainIds := set.NewStringSet()
//    for _, record := range records {
//        domainIds.Add(record.CommunityId)
//    }
//    domains, err := service.GetUnitsMapByIds(domainIds.Elements())
//    if err != nil {
//        return nil, err
//    }
//    return domains, nil
//}