sunty
2024-05-29 16b34c939d6790fe6ebe61f8f0e85fe80ee52224
修改准备数据流程,适应较大数据量的操作
10个文件已修改
271 ■■■■ 已修改文件
config/app.yaml 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config/config.go 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data/prepare.go 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/elastic.go 57 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/models.go 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/repository.go 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
rule/engine.go 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rule/service.go 90 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
task/engine.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
config/app.yaml
@@ -18,6 +18,7 @@
  topHitsSize: 100000
  cameraSize: 100000
  timeInterval: 30
  batchSize: 100
log:
  path: /opt/vasystem/valog/
  level: -1
config/config.go
@@ -32,6 +32,7 @@
    TopHitsSize  int    `mapstructure: "topHitsSize"`
    CameraSize   int    `mapstructure: "cameraSize"`
    TimeInterval int    `mapstructure: "timeInterval"`
    BatchSize    int    `mapstructure: "batchSize"`
}
type app struct {
data/prepare.go
@@ -65,11 +65,11 @@
}
// SetFrequentAddress 方法计算出现最频繁的出行地址并设置为常用地址
func SetFrequentAddress(c *db.CaptureInfo) {
func GetFrequentAddress(captureDetail []db.CaptureDetail) string {
    outAddressCounts := make(map[string]int)
    inAddressCounts := make(map[string]int)
    // 统计每个出行地址的出现次数
    for _, detail := range c.CaptureDetail {
    for _, detail := range captureDetail {
        if detail.Direction == "out" {
            outAddressCounts[detail.CaptureAddress]++
        }
@@ -96,12 +96,11 @@
            }
        }
    }
    // 将出现次数最多的出行地址设置为常用地址
    c.FrequentAddress = frequentAddress
    return frequentAddress
}
// processData 函数处理数据,根据要求过滤掉数据并根据规则更新状态
func ProcessData(captureInfos []db.CaptureInfo, personStatus []db.PersonStatus, ruleInfos []db.PersonnelStatusRule, communityID string) []db.PersonStatus {
func ProcessData(captureInfos []db.CaptureInfo, personStatus []*db.PersonStatus, ruleInfos []db.PersonnelStatusRule, communityID string) []db.PersonStatus {
    filteredInfos := make([]db.PersonStatus, 0)
    // 构建快速查找索引,方便查找对应的人员状态和规则
@@ -109,35 +108,31 @@
    ruleIndex := make(map[string]db.PersonnelStatusRule)
    for _, person := range personStatus {
        statusIndex[person.DocumentNumber] = person
        statusIndex[person.DocumentNumber] = *person
    }
    for _, rule := range ruleInfos {
        ruleIndex[rule.Name] = rule
    }
    //fmt.Println("statusIndex: ", statusIndex)
    //fmt.Println("ruleIndex: ", ruleIndex)
    // 处理每个抓拍信息
    for _, info := range captureInfos {
        //fmt.Println("info", info.DocumentNumber, info.Status, info.FrequentAddress)
        //fmt.Println("person", statusIndex[info.DocumentNumber].DocumentNumber, statusIndex[info.DocumentNumber].Status, statusIndex[info.DocumentNumber].FrequentAddress)
        // 检查是否存在对应的人员状态
        person, ok := statusIndex[info.DocumentNumber]
        if !ok {
            // 不存在对应的人员状态为新数据
            filteredInfos = append(filteredInfos, db.PersonStatus{OrgId: info.OrgId, CommunityID: communityID, DocumentNumber: info.DocumentNumber, Status: info.Status, FrequentAddress: info.FrequentAddress})
            continue
        }
        person := statusIndex[info.DocumentNumber]
        //fmt.Println("person: ", person.DocumentNumber, person.Status, person.FrequentAddress, person.LastAppearanceTime, person.LastAppearanceStatusTime)
        // 判断状态和常用地址是否相等,如果相等则忽略
        if (info.Status == person.Status || info.CaptureDays <= ruleIndex[person.DocumentNumber].DetectionDaysEnd) &&
            info.FrequentAddress == person.FrequentAddress {
            continue
        }
        //if (info.Status == person.Status || info.CaptureDays <= ruleIndex[person.DocumentNumber].DetectionDaysEnd) &&
        //    info.FrequentAddress == person.FrequentAddress {
        //    continue
        //}
        // 更新过滤后的信息列表
        filteredInfos = append(filteredInfos, db.PersonStatus{OrgId: info.OrgId, CommunityID: communityID, DocumentNumber: info.DocumentNumber, Status: info.Status, FrequentAddress: info.FrequentAddress})
        //fmt.Println("LastAppearanceTime: ", person.LastAppearanceTime)
        filteredInfos = append(filteredInfos, db.PersonStatus{CommunityID: communityID, DocumentNumber: info.DocumentNumber, Status: info.Status, FrequentAddress: info.FrequentAddress, LastAppearanceStatusTime: person.LastAppearanceTime})
    }
    //fmt.Println("filteredInfos: ", filteredInfos)
    return filteredInfos
}
db/elastic.go
@@ -4,9 +4,11 @@
    "basic.com/pubsub/esutil.git"
    "basic.com/valib/logger.git"
    "encoding/json"
    "fmt"
    "ruleModelEngine/config"
    "ruleModelEngine/util"
    "strconv"
    "strings"
)
func decodeDocumentInfos(docInfo []map[string]interface{}) ([]CaptureInfo, error) {
@@ -15,8 +17,6 @@
        var captureInfo = CaptureInfo{}
        captureInfo.DocumentNumber = info["key"].(string)
        buckets := info["top_hits"].(map[string]interface{})["hits"].(map[string]interface{})["hits"].([]interface{})
        orgId := buckets[0].(map[string]interface{})["_source"].(map[string]interface{})["orgId"].(string)
        captureInfo.OrgId = orgId
        for _, sourceInfo := range buckets {
            rSourceInfo := sourceInfo.(map[string]interface{})
            source := rSourceInfo["_source"].(map[string]interface{})
@@ -82,13 +82,6 @@
                        }
                    }
                }
            ],
            "must_not": [
                {
                    "term": {
                        "alarmRules.ruleId": 4
                    }
                }
            ]
        }
    },
@@ -104,7 +97,8 @@
                    "top_hits": {
                        "_source": [
                            "documentNumber",
                            "id"
                            "id",
                            "alarmRules.ruleId"
                        ],
                        "size": 1,
                        "sort": [
@@ -120,8 +114,8 @@
        }
    }
}`
    //fmt.Println(queryDSL)
    //fmt.Println(esURL)
    //fmt.Println(queryDSL)
    docNumberMap := make(map[string]string)
    buf, err := esutil.EsReq("POST", esURL, []byte(queryDSL))
    if err != nil {
@@ -136,9 +130,23 @@
        buckets := info["top_hits"].(map[string]interface{})["hits"].(map[string]interface{})["hits"].([]interface{})
        for _, sourceInfo := range buckets {
            rSourceInfo := sourceInfo.(map[string]interface{})
            source := rSourceInfo["_source"].(map[string]interface{})
            documentNumber := source["documentNumber"].(string)
            id := source["id"].(string)
            rSource := rSourceInfo["_source"].(map[string]interface{})
            //如果该天最后一条已经预警过进出异常,将过滤掉不再预警
            alarmFlag := false
            alarmRules := rSource["alarmRules"].([]interface{})
            for _, alarmRule := range alarmRules {
                ruleId := alarmRule.(map[string]interface{})["ruleId"].(string)
                //fmt.Println("ruleId", ruleId,rSource["documentNumber"].(string),rSource["id"].(string))
                if ruleId == "4" {
                    alarmFlag = true
                    break
                }
            }
            if alarmFlag == true {
                continue
            }
            documentNumber := rSource["documentNumber"].(string)
            id := rSource["id"].(string)
            docNumberMap[documentNumber] = id
        }
    }
@@ -306,9 +314,10 @@
    return false, nil
}
func Query1MDataByCommunityId(communityId string) ([]CaptureInfo, error) {
func Query1MDataByCommunityId(communityId string, documentNumber []string, days int) ([]CaptureInfo, error) {
    //fmt.Println(config.Elastic.DocumentSize)
    //fmt.Println(config.Elastic.TopHitsSize)
    documentNumberStr := strings.Replace(strings.Trim(fmt.Sprint(documentNumber), "[]"), " ", "\",\"", -1)
    esURL := "http://" + config.Elastic.Host + ":" + config.Elastic.Port + "/" + config.Elastic.Index + "/_search"
    queryDSL := `
    {
@@ -318,7 +327,7 @@
                    {
                        "range": {
                            "picDate": {
                                "gte": "now-30d/d",
                                "gte": "now-` + strconv.Itoa(days) + `d/d",
                                "lt": "now/d"
                            }
                        }
@@ -327,16 +336,13 @@
                        "term":{
                            "communityId":"` + communityId + `"
                        }
                    },
                    {
                        "terms":{
                            "documentNumber":["` + documentNumberStr + `"]
                        }
                    }
                ],
                "must_not": [
                    {
                        "term": {
                            "documentNumber": ""
                        }
                    }
                ],
                "should": []
                ]
            }
        },
        "size": 0,
@@ -352,7 +358,6 @@
                            "_source": [
                                "documentNumber",
                                "picDate",
                                "orgId",
                                "cameraLocation.building",
                                "cameraLocation.unit",
                                "cameraLocation.floor",
db/models.go
@@ -78,18 +78,19 @@
}
type PersonStatus struct {
    Id                 uint   `gorm:"column:id;primary_key;auto_increment;not null;"`
    OrgId              string `gorm:"column:org_id;type:varchar(299);not null;default:''"`                                                                                            // 派出所id
    CommunityID        string `gorm:"uniqueIndex:idx_document_number_community_id;index:community_id_last_appearance_time;column:community_id;type:varchar(299);not null;default:''"` // 小区id
    DocumentNumber     string `gorm:"uniqueIndex:idx_document_number_community_id;column:document_number;type:varchar(299);not null;default:''"`                                      // 档案编号
    DaysAppeared       int    `gorm:"column:days_appeared;type:int(11);not null;default:0" json:"daysAppeared"`                                                                       // 出现天数
    Count              int    `gorm:"column:count;type:int;not null;default:0"`                                                                                                       // 抓拍次数
    Status             string `gorm:"column:status;type:varchar(255);not null;default:''"`                                                                                            //标签
    LastAppearanceTime int64  `gorm:"index:community_id_last_appearance_time;column:last_appearance_time;type:int;not null;default:0" json:"lastAppearanceTime"`                      //最后出现时间
    LastLocation       string `gorm:"column:last_location;type:varchar(255);not null;default:''" json:"lastLocation"`                                                                 //最后出现地点
    FrequentAddress    string `gorm:"column:frequent_address;type:varchar(255);not null;default:''" json:"frequentAddress"`                                                           //常出现地点
    CreatedAt          time.Time
    UpdatedAt          time.Time
    Id                       uint   `gorm:"column:id;primary_key;auto_increment;not null;"`
    OrgId                    string `gorm:"column:org_id;type:varchar(299);not null;default:''"`                                                                                            // 派出所id
    CommunityID              string `gorm:"uniqueIndex:idx_document_number_community_id;index:community_id_last_appearance_time;column:community_id;type:varchar(299);not null;default:''"` // 小区id
    DocumentNumber           string `gorm:"uniqueIndex:idx_document_number_community_id;column:document_number;type:varchar(299);not null;default:''"`                                      // 档案编号
    DaysAppeared             int    `gorm:"column:days_appeared;type:int(11);not null;default:0" json:"daysAppeared"`                                                                       // 出现天数
    Count                    int    `gorm:"column:count;type:int;not null;default:0"`                                                                                                       // 抓拍次数
    Status                   string `gorm:"column:status;type:varchar(255);not null;default:''"`                                                                                            //标签
    LastAppearanceTime       int64  `gorm:"index:community_id_last_appearance_time;column:last_appearance_time;type:int;not null;default:0" json:"lastAppearanceTime"`                      //最后出现时间
    LastAppearanceStatusTime int64  `gorm:"column:last_appearance_status_time"`
    LastLocation             string `gorm:"column:last_location;type:varchar(255);not null;default:''" json:"lastLocation"`       //最后出现地点
    FrequentAddress          string `gorm:"column:frequent_address;type:varchar(255);not null;default:''" json:"frequentAddress"` //常出现地点
    CreatedAt                time.Time
    UpdatedAt                time.Time
    ////OrgId           string `gorm:"column:org_id"`
    //CommunityID     string `gorm:"column:communityID"`
    //DocumentNumber  string `gorm:"column:documentNumber"`
@@ -139,7 +140,6 @@
}
type CaptureInfo struct {
    OrgId           string          `json:"orgId"` //派出所Id
    DocumentNumber  string          `json:"documentNumber"`
    CaptureDays     int             `json:"captureDays"`     //抓拍天数
    OvernightStays  int             `json:"overnightStays"`  //过夜天数
db/repository.go
@@ -103,6 +103,20 @@
    return personStatusList, nil
}
// 查询小区档案表 (原查询任务属性)
func QueryPersonStatusWithPagination(community_id string, timeThreshold int64) ([]*PersonStatus, error) {
    var db = DB
    var personStatusList []*PersonStatus
    result := db.Select("document_number, status, frequent_address, last_appearance_time, last_appearance_status_time").
        Where("community_id = ? AND last_appearance_time != last_appearance_status_time AND last_appearance_time > ?", community_id, timeThreshold).
        Find(&personStatusList)
    if result.Error != nil {
        logger.Error(result.Error)
        return nil, result.Error
    }
    return personStatusList, nil
}
// 查询人物年龄
func GetAgeById(id string) (int, error) {
    var db = DB
@@ -276,37 +290,16 @@
    var db = DB
    // 遍历人员信息
    for _, person := range persons {
        // 检查记录是否存在
        var existingPerson PersonStatus
        err := db.Where("document_number = ? AND community_id = ?", person.DocumentNumber, communityID).First(&existingPerson).Error
        if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
            logger.Error("Query person error:", err, person.DocumentNumber, communityID)
            //fmt.Println("asdasfasfasf")
            continue
            //return err
        err := db.Model(&PersonStatus{}).
            Where("document_number = ? AND community_id = ?", person.DocumentNumber, communityID).
            Updates(map[string]interface{}{
                "status":                   person.Status,
                "frequent_address":         person.FrequentAddress,
                "LastAppearanceStatusTime": person.LastAppearanceStatusTime,
            }).Error
        if err != nil {
            return err
        }
        // 如果记录存在,则更新
        if existingPerson.DocumentNumber != "" {
            err := db.Model(&PersonStatus{}).
                Where("document_number = ? AND community_id = ?", person.DocumentNumber, communityID).
                Updates(map[string]interface{}{
                    "status":           person.Status,
                    "frequent_address": person.FrequentAddress,
                }).Error
            if err != nil {
                return err
            }
        }
        //else {
        //    // 如果记录不存在,则插入新记录
        //    err := db.Create(&person).Error
        //    if err != nil {
        //        return err
        //    }
        //}
    }
    return nil
main.go
@@ -56,7 +56,7 @@
func main() {
    //db.UpdatePersonStatusByIds()
    immediate := flag.Bool("immediate", false, "whether to execute immediately")
    immediate := flag.Bool("immediate", true, "whether to execute immediately")
    flag.Parse()
    if *immediate {
rule/engine.go
@@ -122,6 +122,8 @@
                continue
            }
            captureInfo = append(captureInfo, info)
        } else {
            captureInfo = append(captureInfo, info)
        }
    }
rule/service.go
@@ -32,7 +32,7 @@
// 执行程序入口
func ExecuteTask() {
    //进出异常布控任务,暂时归类到标签计算部分
    fmt.Println("进出异常开始入口!!!!!!")
    //fmt.Println("进出异常开始入口!!!!!!")
    tasks, err := db.GetAllTaskData()
    if err != nil {
        logger.Error("GetAllTaskData Error", err)
@@ -44,20 +44,21 @@
            tkInfo.Name = taskInfo.Name
        }
    }
    fmt.Println("tkInfo: ", tkInfo)
    //fmt.Println("tkInfo: ", tkInfo)
    days := config.Api.AInterval
    docNumIdMap, err := db.QueryLastIdByDayRange(days, days-1)
    if err != nil {
        logger.Error("QueryByDayRange err: ", err)
    }
    fmt.Println("docNumIdMap: ", len(docNumIdMap))
    //fmt.Println("docNumIdMap: ", len(docNumIdMap))
    for docNumber, id := range docNumIdMap {
        //fmt.Println(docNumber, id)
        alarmRules := make([]db.AlarmRule, 0)
        flag := task.EnteringButNotLeaving(docNumber, id, days)
        flag := task.EnteringButNotLeaving(docNumber, days)
        if flag == true {
            alarmRules = append(alarmRules, db.AlarmRule{RuleId: strconv.Itoa(tkInfo.Id), RuleText: tkInfo.Name, AlarmLevel: "0"})
        }
        //fmt.Println("alarmRules: ", id, alarmRules)
        addFlag, err := db.AddAlarmRules(alarmRules, id)
        if err != nil {
            logger.Error("AddAlarmRules err: ", err)
@@ -88,53 +89,68 @@
        //if communityID != "50010101010000001001" {
        //    continue
        //}
        fmt.Println("communityID: ", communityID)
        //查询社区内人员档案,方便数据更新
        personStatus, err := db.GetDBPersonStatusData(communityID)
        personStatusList, err := db.QueryPersonStatusWithPagination(communityID, 30)
        if err != nil {
            logger.Error("GetDBPersonStatusData Error", err)
            logger.Error("QueryPersonStatusWithPagination err: ", err)
        }
        //fmt.Println(labeManage)
        //fmt.Println("personStatus: ", personStatus)
        //fmt.Println("CcmmunityIDs: ", cmmunityID)
        //按社区id查询近一个月es数据
        captureInfos, err := db.Query1MDataByCommunityId(communityID)
        //fmt.Println("captureInfos: ", captureInfos)
        //residentCount := 0
        //
        documentNumberIDS := make([]string, 0)
        for _, personStatus := range personStatusList {
            //fmt.Println("personStatus.LastAppearanceTime: ", personStatus.LastAppearanceTime)
            documentNumberIDS = append(documentNumberIDS, personStatus.DocumentNumber)
            //业务逻辑
        }
        //fmt.Println("len(documentNumberIDS)", len(documentNumberIDS))
        captureInfos := make([]db.CaptureInfo, 0)
        batchSize := config.Elastic.BatchSize
        //fmt.Println(batchSize)
        for i := 0; i < len(documentNumberIDS); i += batchSize {
            end := i + batchSize
            if end > len(documentNumberIDS) {
                end = len(documentNumberIDS)
            }
            batch := documentNumberIDS[i:end]
            //fmt.Println("batch: ", batch)
            batchCaptureInfos, err := db.Query1MDataByCommunityId(communityID, batch, 30)
            if err != nil {
                logger.Error("Query1MDataByCommunityId Error", err)
            }
            if len(batchCaptureInfos) == 0 {
                continue
            }
            //fmt.Println("batchCaptureInfos: ", batchCaptureInfos)
            captureInfos = append(captureInfos, batchCaptureInfos...)
        }
        if len(captureInfos) == 0 {
            continue
        }
        fmt.Println("共计有档案数据条数为:", len(captureInfos))
        //补全分析所需数据
        for i := range captureInfos {
            //fmt.Println(captureInfos[i].DocumentNumber)
            captureDays, overnightCount := data.CalculateCaptureDays(captureInfos[i].CaptureDetail)
            captureInfos[i].CaptureDays = captureDays
            captureInfos[i].OvernightStays = overnightCount
            //if captureInfos[i].CaptureDays <= 10 && captureInfos[i].OvernightStays >= 1 {
            //    fmt.Println(captureInfos[i].DocumentNumber)
            //    fmt.Println("该人员出现天数为", captureInfos[i].CaptureDays)
            //    fmt.Println("该人员过夜天数为", captureInfos[i].OvernightStays)
            //}
            captureInfos[i].Status = data.SetStatus(captureDays, ruleInfo)
            if captureInfos[i].OvernightStays >= 5 && (captureInfos[i].CaptureDays <= 14 && captureInfos[i].CaptureDays >= 5) {
                captureInfos[i].Status = "resident"
            }
            //if captureInfos[i].Status == "resident" {
            //    residentCount++
            //}
            //fmt.Println("SetStatus: ", captureInfos[i].Status)
            age, err := db.QueryAgeById(captureInfos[i].DocumentNumber)
            if err != nil {
                logger.Error("QueryAgeById ERROR", err)
            }
            captureInfos[i].Age = age
            data.SetFrequentAddress(&captureInfos[i])
            //fmt.Println("captureInfos[i].Age: ", captureInfos[i].Age)
            captureInfos[i].FrequentAddress = data.GetFrequentAddress(captureInfos[i].CaptureDetail)
            //fmt.Println("CaptureDetail: ", captureInfos[i].DocumentNumber, captureInfos[i].CaptureDays, captureInfos[i].CaptureDetail)
        }
        //fmt.Println("residentCount: ", residentCount)
        if err != nil {
            logger.Error("MatchAllTargets Error", err)
        }
        if len(captureInfos) == 0 {
            continue
        }
        //fmt.Println("captureInfosQ: ", captureInfos)
        for _, identity := range labeManage {
            switch identity.Name {
            case "服务人员":
@@ -153,10 +169,14 @@
            logger.Error("UpdateDBPersonLabel Error", errIdentity)
        }
        //continue
        //fmt.Println("captureInfos: ", captureInfos)
        postCaptureInfos := data.ProcessData(captureInfos, personStatus, ruleInfo, communityID)
        //fmt.Println("postCaptureInfos: ", postCaptureInfos)
        //fmt.Println("共过滤条数:", len(captureInfos)-len(postCaptureInfos))
        postCaptureInfos := data.ProcessData(captureInfos, personStatusList, ruleInfo, communityID)
        /*for _, inf := range postCaptureInfos {
            fmt.Println("inf: ", inf.DocumentNumber, inf.Status, inf.FrequentAddress, inf.LastAppearanceStatusTime)
        }*/
        //fmt.Println("共更新档案数:", len(postCaptureInfos))
        //fmt.Println("----->captureInfos: ", len(captureInfos))
        //continue
        UpdatePersonInfoErr := db.UpdatePersonInfo(postCaptureInfos, communityID)
        if UpdatePersonInfoErr != nil {
            logger.Error("MatchPermanentResidentTargets Error: ", UpdatePersonInfoErr)
task/engine.go
@@ -49,7 +49,7 @@
    return false
}
func EnteringButNotLeaving(docNumber string, id string, days int) bool {
func EnteringButNotLeaving(docNumber string, days int) bool {
    total, err := db.QueryTimesByDocNumberDays(days-1, docNumber)
    if err != nil {
        logger.Error("QueryTimesByDocNumberDays err: ", err)