sunty
2024-04-30 a1fdc969dd20a97087e986c69fdfd25ffe115368
新增迁入迁出,属性分析,身份分析等
9个文件已添加
1339 ■■■■■ 已修改文件
config/app.yaml 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config/config.go 73 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data/prepare.go 122 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/repository.go 290 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 46 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 46 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rule/engine.go 304 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rule/service.go 147 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
util/util.go 285 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config/app.yaml
New file
@@ -0,0 +1,26 @@
app:
  name: Capture Analysis Rule Model Engine
  port: 7024
  logLevel: debug
database:
  driver: mysql
  host: 192.168.20.115
  port: 3306
  #  username: root
  #  password: c++java123
  name: faceanalysis
elastic:
  host: 192.168.20.115
  port: 9200
  index: ai_face_ocean
  type: analysisModel
  documentSize: 100000
  topHitsSize: 100000
  cameraSize: 100000
  timeInterval: 30
log:
  path: /opt/vasystem/valog/
  level: -1
  maxSize: 128
  maxBackups: 30
  maxAge: 15
config/config.go
New file
@@ -0,0 +1,73 @@
package config
import (
    "basic.com/valib/logger.git"
    "github.com/spf13/viper"
    "log"
)
type LogConfig struct {
    Path       string `mapstructure:"path"`       //日志存储路径
    Level      int    `mapstructure:"level"`      //日志等级
    MaxSize    int    `mapstructure:"maxSize"`    //日志文件大小上限
    MaxBackups int    `mapstructure:"maxBackups"` //日志压缩包个数
    MaxAge     int    `mapstructure:"maxAge"`     //保留压缩包天数
}
type database struct {
    Driver string `mapstructure: "driver"`
    Host   string `mapstructure: "host"`
    Port   string `mapstructure: "port"`
    Name   string `mapstructure: "name"`
}
type elastic struct {
    Host         string `mapstructure: "host"`
    Port         string `mapstructure: "port"`
    Index        string `mapstructure: "index"`
    Type         string `mapstructure: "type"`
    DocumentSize int    `mapstructure: "documentSize"`
    TopHitsSize  int    `mapstructure: "topHitsSize"`
    CameraSize   int    `mapstructure: "cameraSize"`
    TimeInterval int    `mapstructure: "timeInterval"`
}
type app struct {
    Name     string `mapstructure: "Annotation Service Application"`
    Port     string `mapstructure: "port"`
    LogLevel string `mapstructure: "logLevel"`
}
var LogConf = &LogConfig{}
var DataBase = &database{}
var Elastic = &elastic{}
var App = &app{}
var LogBasePath string
var LogLevel int
func Init(env string) {
    var err error
    viper.SetConfigType("yaml")
    viper.SetConfigName(env)
    viper.AddConfigPath("config")
    err = viper.ReadInConfig()
    if err != nil {
        log.Fatal("error on parsing configuration file", err)
    }
    viper.UnmarshalKey("elastic", Elastic)
    viper.UnmarshalKey("database", DataBase)
    viper.UnmarshalKey("app", App)
    viper.UnmarshalKey("log", LogConf)
    logger.SetLevel(LogConf.Level)
    if viper.GetString("LogBasePath") != "" {
        LogBasePath = viper.GetString("LogBasePath")
    } else {
        LogBasePath = "./logger/"
    }
    if viper.IsSet("LogLevel") && viper.GetInt("LogLevel") >= logger.PanicLevel && viper.GetInt("LogLevel") <= logger.DebugLevel {
        LogLevel = viper.GetInt("LogLevel")
    } else {
        LogLevel = logger.DebugLevel
    }
}
data/prepare.go
New file
@@ -0,0 +1,122 @@
package data
import (
    "fmt"
    "ruleModelEngine/db"
    "time"
)
// 计算抓拍天数
func CalculateCaptureDays(details []db.CaptureDetail) int {
    // 使用 map 来存储每天是否有抓拍记录
    captureMap := make(map[string]bool)
    for _, detail := range details {
        // 解析抓拍日期
        layout := "2006-01-02 15:04:05"
        captureTime, err := time.Parse(layout, detail.CaptureDate)
        if err != nil {
            fmt.Println("解析抓拍日期时出错:", err)
            continue
        }
        // 获取日期部分
        date := captureTime.Format("2006-01-02")
        // 在 map 中标记这一天有抓拍记录
        captureMap[date] = true
    }
    // 统计有抓拍记录的天数
    captureDays := 0
    for range captureMap {
        captureDays++
    }
    return captureDays
}
// 设置状态
func SetStatus(captureDays int, rules []db.PersonnelStatusRule) string {
    for _, rule := range rules {
        if captureDays >= rule.DetectionDaysStart && captureDays <= rule.DetectionDaysEnd {
            return rule.Name
        }
    }
    return rules[1].Name
}
// SetFrequentAddress 方法计算出现最频繁的出行地址并设置为常用地址
func SetFrequentAddress(c *db.CaptureInfo) {
    outAddressCounts := make(map[string]int)
    inAddressCounts := make(map[string]int)
    // 统计每个出行地址的出现次数
    for _, detail := range c.CaptureDetail {
        if detail.Direction == "out" {
            outAddressCounts[detail.CaptureAddress]++
        }
        if detail.Direction == "in" {
            inAddressCounts[detail.CaptureAddress]++
        }
    }
    // 找到出现次数最多的出行地址
    maxOutCount := 0
    var frequentAddress string
    for address, count := range outAddressCounts {
        if count > maxOutCount {
            maxOutCount = count
            frequentAddress = address
        }
    }
    if frequentAddress == "" {
        maxInCount := 0
        for address, count := range inAddressCounts {
            if count > maxInCount {
                maxInCount = count
                frequentAddress = address
            }
        }
    }
    // 将出现次数最多的出行地址设置为常用地址
    c.FrequentAddress = frequentAddress
}
// processData 函数处理数据,根据要求过滤掉数据并根据规则更新状态
func ProcessData(captureInfos []db.CaptureInfo, personStatus []db.PersonStatus, ruleInfos []db.PersonnelStatusRule, communityID string) []db.PersonStatus {
    filteredInfos := make([]db.PersonStatus, 0)
    // 构建快速查找索引,方便查找对应的人员状态和规则
    statusIndex := make(map[string]db.PersonStatus)
    ruleIndex := make(map[string]db.PersonnelStatusRule)
    for _, person := range personStatus {
        statusIndex[person.DocumentNumber] = person
    }
    for _, rule := range ruleInfos {
        ruleIndex[rule.Name] = rule
    }
    // 处理每个抓拍信息
    for _, info := range captureInfos {
        //fmt.Println("info", info.DocumentNumber, info.Status, info.FrequentAddress)
        //fmt.Println("person", statusIndex[info.DocumentNumber].DocumentNumber, statusIndex[info.DocumentNumber].Status, statusIndex[info.DocumentNumber].FrequentAddress)
        // 检查是否存在对应的人员状态
        person, ok := statusIndex[info.DocumentNumber]
        if !ok {
            // 不存在对应的人员状态为新数据
            filteredInfos = append(filteredInfos, db.PersonStatus{CommunityID: communityID, DocumentNumber: info.DocumentNumber, Status: info.Status, FrequentAddress: info.FrequentAddress})
            continue
        }
        // 判断状态和常用地址是否相等,如果相等则忽略
        if (info.Status == person.Status || info.CaptureDays <= ruleIndex[person.DocumentNumber].DetectionDaysEnd) &&
            info.FrequentAddress == person.FrequentAddress {
            continue
        }
        // 更新过滤后的信息列表
        filteredInfos = append(filteredInfos, db.PersonStatus{CommunityID: communityID, DocumentNumber: info.DocumentNumber, Status: info.Status, FrequentAddress: info.FrequentAddress})
    }
    return filteredInfos
}
db/repository.go
New file
@@ -0,0 +1,290 @@
package db
import (
    "basic.com/valib/logger.git"
    "errors"
    "gorm.io/gorm"
)
// 查询小区表
func GetCommunityIDs() ([]string, error) {
    db, err := ConnectDB()
    if err != nil {
        return nil, err
    }
    // 查询数据
    var communityIDs []string
    result := db.Table("domain_unit").Where("domainType = ?", 1).Pluck("id", &communityIDs)
    if result.Error != nil {
        return nil, result.Error
    }
    return communityIDs, nil
}
// 查询全部数据
func GetAllData() ([]PersonnelStatusRule, error) {
    db, err := ConnectDB()
    if err != nil {
        return nil, err
    }
    var rules []PersonnelStatusRule
    if err := db.Find(&rules).Error; err != nil {
        return nil, err
    }
    return rules, nil
}
// 查询住户时间数据
func GetResidentData(status, communityID string) ([]Resident, error) {
    db, err := ConnectDB()
    if err != nil {
        return nil, err
    }
    var residents []Resident
    // 执行查询
    rows, err := db.Table("person_status").
        Select("person_status.documentNumber", "person_status.communityID", "snapshot_count_summary.last_appearance_time", "snapshot_count_summary.created_at").
        Joins("INNER JOIN snapshot_count_summary ON person_status.documentNumber = snapshot_count_summary.document_number AND person_status.communityID = snapshot_count_summary.community_id").
        Where("person_status.status = ? AND person_status.communityID = ?", status, communityID).
        Rows()
    if err != nil {
        return nil, err
    }
    defer rows.Close()
    // 遍历查询结果
    for rows.Next() {
        var resident Resident
        err := rows.Scan(&resident.DocumentNumber, &resident.CommunityID, &resident.LastAppearanceTime, &resident.CreateAt)
        if err != nil {
            return nil, err
        }
        residents = append(residents, resident)
    }
    if err := rows.Err(); err != nil {
        return nil, err
    }
    return residents, nil
}
// 查询人物属性
func GetDBPersonStatusData(id string) ([]PersonStatus, error) {
    db, err := ConnectDB()
    if err != nil {
        return nil, err
    }
    // 查询数据
    var personStatusList []PersonStatus
    if err := db.Table("person_status").
        Select("documentNumber, status, frequentAddress").
        Where("communityID = ?", id).
        Find(&personStatusList).Error; err != nil {
        return nil, err
    }
    return personStatusList, nil
}
// 根据社区id和住户属性查询住户档案编号
func GetDocNumberFromPersonStatus(id, status string) ([]string, error) {
    db, err := ConnectDB()
    if err != nil {
        return nil, err
    }
    // 查询数据
    var personStatusList []PersonStatus
    if err := db.Table("person_status").
        Select("documentNumber, status, frequentAddress").
        Where("communityID = ? AND status = ?", id, status).
        Find(&personStatusList).Error; err != nil {
        return nil, err
    }
    docNum := make([]string, 0)
    for _, ps := range personStatusList {
        docNum = append(docNum, ps.DocumentNumber)
    }
    return docNum, nil
}
// 查询人物身份属性表
func GetLabelManageIdentity(IdentityType int) ([]LabelManage, error) {
    db, err := ConnectDB()
    if err != nil {
        return nil, err
    }
    // 查询数据
    var labelManageIdentity []LabelManage
    if err := db.Table("label_manage").
        Select("id, name, valid_days").
        Where("type = ?", IdentityType).
        Find(&labelManageIdentity).Error; err != nil {
        return nil, err
    }
    return labelManageIdentity, nil
}
// UpdatePersonInfo 更新或插入多个人员信息
func UpdateMoveInout(personsMoveInout []MoveInout) error {
    // 数据库连接信息
    db, err := ConnectDB()
    if err != nil {
        return err
    }
    // 遍历人员信息
    for _, personMoveInout := range personsMoveInout {
        // 检查记录是否存在
        var existingPerson MoveInout
        err := db.Where("document_number = ? AND community_id = ?", personMoveInout.DocumentNumber, personMoveInout.CommunityID).First(&existingPerson).Error
        if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
            logger.Error("Query person error:", err, personMoveInout.DocumentNumber, personMoveInout.CommunityID)
            //fmt.Println("asdasfasfasf")
            continue
            //return err
        }
        // 如果记录存在,则更新
        if existingPerson.DocumentNumber != "" {
            if existingPerson.Status != "Verified" {
                err := db.Model(&MoveInout{}).
                    Where("document_number = ? AND community_id = ?", personMoveInout.DocumentNumber, personMoveInout.CommunityID).
                    Updates(map[string]interface{}{
                        "status":        personMoveInout.Status,
                        "move_out_date": personMoveInout.MoveOutDate,
                    }).Error
                if err != nil {
                    return err
                }
            } else {
                err := db.Model(&MoveInout{}).
                    Where("document_number = ? AND community_id = ?", personMoveInout.DocumentNumber, personMoveInout.CommunityID).
                    Updates(map[string]interface{}{
                        "move_out_date": personMoveInout.MoveOutDate,
                    }).Error
                if err != nil {
                    return err
                }
            }
        } else {
            // 如果记录不存在,则插入新记录
            err := db.Create(&personsMoveInout).Error
            if err != nil {
                return err
            }
        }
    }
    return nil
}
// UpdatePersonInfo 更新或插入多个人员身份信息
func UpdateDBPersonLabel(personsIdentity []Identity) error {
    // 数据库连接信息
    db, err := ConnectDB()
    if err != nil {
        return err
    }
    // 遍历人员信息
    for _, personIdentity := range personsIdentity {
        // 检查记录是否存在
        var existingPerson Identity
        err := db.Where("dbtablepersons_id = ? AND community_id = ? AND label_id = ?",
            personIdentity.DocumentNumber,
            personIdentity.CommunityID,
            personIdentity.LabelId,
        ).First(&existingPerson).Error
        if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
            logger.Error("Query person error:", err,
                personIdentity.DocumentNumber,
                personIdentity.CommunityID,
                personIdentity.LabelId)
            //fmt.Println("asdasfasfasf")
            continue
            //return err
        }
        // 如果记录存在,则更新
        if existingPerson.DocumentNumber != "" {
            err := db.Model(&Identity{}).
                Where("dbtablepersons_id = ? AND community_id = ? AND label_id = ?",
                    personIdentity.DocumentNumber,
                    personIdentity.CommunityID,
                    personIdentity.LabelId,
                ).
                Updates(map[string]interface{}{
                    "expire_time": personIdentity.ExpireTime,
                }).Error
            if err != nil {
                return err
            }
        } else {
            // 如果记录不存在,则插入新记录
            err := db.Create(&personIdentity).Error
            if err != nil {
                return err
            }
        }
    }
    return nil
}
// UpdatePersonInfo 更新或插入多个人员信息
func UpdatePersonInfo(persons []PersonStatus, communityID string) error {
    // 数据库连接信息
    db, err := ConnectDB()
    if err != nil {
        return err
    }
    // 遍历人员信息
    for _, person := range persons {
        // 检查记录是否存在
        var existingPerson PersonStatus
        err := db.Where("documentNumber = ? AND communityID = ?", person.DocumentNumber, communityID).First(&existingPerson).Error
        if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
            logger.Error("Query person error:", err, person.DocumentNumber, communityID)
            //fmt.Println("asdasfasfasf")
            continue
            //return err
        }
        // 如果记录存在,则更新
        if existingPerson.DocumentNumber != "" {
            err := db.Model(&PersonStatus{}).
                Where("documentNumber = ? AND communityID = ?", person.DocumentNumber, communityID).
                Updates(map[string]interface{}{
                    "status":          person.Status,
                    "frequentAddress": person.FrequentAddress,
                }).Error
            if err != nil {
                return err
            }
        } else {
            // 如果记录不存在,则插入新记录
            err := db.Create(&person).Error
            if err != nil {
                return err
            }
        }
    }
    return nil
}
go.mod
New file
@@ -0,0 +1,46 @@
module ruleModelEngine
go 1.20
require (
    basic.com/pubsub/esutil.git v0.0.0-20240401091908-7a10b30099c6
    basic.com/valib/logger.git v0.0.0-20220225105132-5cf6309c132f
    github.com/spf13/viper v1.18.2
    gorm.io/driver/mysql v1.5.6
    gorm.io/gorm v1.25.9
)
require (
    basic.com/pubsub/protomsg.git v0.0.0-20230210092337-5f1e6cdae7c3 // indirect
    filippo.io/edwards25519 v1.1.0 // indirect
    github.com/fsnotify/fsnotify v1.7.0 // indirect
    github.com/go-sql-driver/mysql v1.8.1 // indirect
    github.com/gogo/protobuf v1.3.2 // indirect
    github.com/golang/protobuf v1.5.4 // indirect
    github.com/hashicorp/hcl v1.0.0 // indirect
    github.com/jinzhu/inflection v1.0.0 // indirect
    github.com/jinzhu/now v1.1.5 // indirect
    github.com/magiconair/properties v1.8.7 // indirect
    github.com/mitchellh/mapstructure v1.5.0 // indirect
    github.com/natefinch/lumberjack v2.0.0+incompatible // indirect
    github.com/pelletier/go-toml/v2 v2.2.1 // indirect
    github.com/sagikazarmark/locafero v0.4.0 // indirect
    github.com/sagikazarmark/slog-shim v0.1.0 // indirect
    github.com/sourcegraph/conc v0.3.0 // indirect
    github.com/spf13/afero v1.11.0 // indirect
    github.com/spf13/cast v1.6.0 // indirect
    github.com/spf13/pflag v1.0.5 // indirect
    github.com/subosito/gotenv v1.6.0 // indirect
    go.uber.org/atomic v1.11.0 // indirect
    go.uber.org/multierr v1.11.0 // indirect
    go.uber.org/zap v1.27.0 // indirect
    golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 // indirect
    golang.org/x/net v0.24.0 // indirect
    golang.org/x/sys v0.19.0 // indirect
    golang.org/x/text v0.14.0 // indirect
    google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect
    google.golang.org/grpc v1.63.2 // indirect
    google.golang.org/protobuf v1.33.0 // indirect
    gopkg.in/ini.v1 v1.67.0 // indirect
    gopkg.in/yaml.v3 v3.0.1 // indirect
)
main.go
New file
@@ -0,0 +1,46 @@
package main
import (
    "ruleModelEngine/config"
    //"annotation_service/db"
    "basic.com/valib/logger.git"
    "flag"
    "ruleModelEngine/rule"
    "time"
)
var env = flag.String("app", "app", "read database info")
func init() {
    config.Init(*env)
    var logFile = config.LogConf.Path + "annotation_service.log"
    logger.InitLogger(logFile, config.LogConf.Level, config.LogConf.MaxSize, config.LogConf.MaxBackups, config.LogConf.MaxAge)
    logger.Info("loginit success !")
}
func main() {
    //db.UpdatePersonStatusByIds()
    immediate := flag.Bool("immediate", true, "whether to execute immediately")
    flag.Parse()
    if *immediate {
        logger.Info("Executing immediately...")
        rule.ExecuteTask()
    }
    now := time.Now()
    next := time.Date(now.Year(), now.Month(), now.Day()+1, 1, 0, 0, 0, now.Location())
    duration := next.Sub(now)
    timer := time.NewTimer(duration)
    logger.Info("The program has started and will execute at one o'clock in the early morning every night.")
    for {
        <-timer.C
        logger.Info("Executing at 1 AM...")
        rule.ExecuteTask()
        next = next.Add(24 * time.Hour)
        timer.Reset(next.Sub(time.Now()))
    }
}
rule/engine.go
New file
@@ -0,0 +1,304 @@
package rule
import (
    "basic.com/valib/logger.git"
    "errors"
    "fmt"
    "math"
    "ruleModelEngine/db"
    "strings"
    "time"
)
func processResidentStatus(residents []db.Resident) []db.MoveInout {
    // 获取当前日期的年月日
    now := time.Now()
    currentDate := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
    moveInout := make([]db.MoveInout, 0)
    // 遍历Resident结构体切片
    for _, resident := range residents {
        //fmt.Println("gogogogo!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        // 将字符串类型的时间转换为time.Time类型,并只保留年月日
        lastAppearanceTime := time.Unix(resident.LastAppearanceTime, 0)
        lastAppearanceDate := time.Date(lastAppearanceTime.Year(), lastAppearanceTime.Month(), lastAppearanceTime.Day(), 0, 0, 0, 0, lastAppearanceTime.Location())
        //lastAppearanceTime, err := time.Parse("2006-01-02", resident.LastAppearanceTime)
        datePart := strings.Split(resident.CreateAt, "T")[0]
        createdAt, err := time.Parse("2006-01-02", datePart)
        if err != nil {
            fmt.Println(err)
            // 处理时间解析错误
            // 可以选择跳过该条数据或者记录日志
            continue
        }
        // 计算LastAppearanceTime和CreateAt距离当前日期的天数
        daysSinceLastAppearance := currentDate.Sub(lastAppearanceDate).Hours() / 24
        daysSinceCreateAt := currentDate.Sub(createdAt).Hours() / 24
        moveType := "MoveIn"
        status := ""
        //fmt.Println("daysSinceLastAppearance: ", daysSinceLastAppearance)
        //fmt.Println("daysSinceCreateAt: ", daysSinceCreateAt)
        // 判断是否为疑似搬离或确认搬离
        if daysSinceLastAppearance > 15 {
            moveType = "MoveOut"
            status = "Pending"
        } else if daysSinceLastAppearance > 30 {
            moveType = "MoveOut"
            status = "Confirmed"
        }
        // 判断是否为疑似入住或确认入住
        if moveType == "MoveIn" {
            if daysSinceCreateAt > 15 {
                status = "Pending"
            } else if daysSinceCreateAt > 30 {
                status = "Confirmed"
            }
        }
        //fmt.Println("status: ", status)
        moveInout = append(moveInout, db.MoveInout{
            DocumentNumber: resident.DocumentNumber,
            CommunityID:    resident.CommunityID,
            MoveInDate:     createdAt,           // 存储年月日
            MoveOutDate:    &lastAppearanceDate, // 存储年月日
            MoveType:       moveType,
            Status:         status,
        })
    }
    return moveInout
}
func ProcessRuleEngine(personInfos []db.CaptureInfo, ruleInfo []db.PersonnelStatusRule, cmmunityID string) (bool, error) {
    ruleInfoCheck := checkRuleValidity(ruleInfo)
    if ruleInfoCheck == false {
        logger.Error("规则库数据异常")
        return false, errors.New("规则库数据异常")
    }
    fmt.Println("清洗前: ", len(personInfos))
    logger.Info("规则算法执行完毕!!!")
    return true, nil
}
func CreateLinearModel(personInfos []db.CaptureInfo, communityID string, threshold float64, validDays int) ([]db.Identity, []db.CaptureInfo) {
    identity := make([]db.Identity, 0)
    captureInfo := make([]db.CaptureInfo, 0)
    for _, info := range personInfos {
        if info.Status == "resident" {
            addrData := make(map[string]int)
            //fmt.Println("DocumentNumber: ", info.DocumentNumber)
            for _, addr := range info.CaptureDetail {
                if !strings.Contains(addr.CaptureAddress, "F") || addr.Direction == "in" {
                    continue
                }
                addrData[addr.CaptureAddress]++
            }
            if len(addrData) < 5 {
                captureInfo = append(captureInfo, info)
                continue
            }
            addrDataArray := mapIntToFloatSlice(addrData)
            derivative := firstDerivative(addrDataArray)
            //fmt.Println(addrData)
            stdDev := stdDeviation(derivative)
            if len(addrDataArray) > 10 {
                stdDev = stdDev / 2
            }
            if stdDev < threshold {
                //fmt.Println(addrDataArray)
                //fmt.Println("threshold: ", threshold)
                //fmt.Println("stdDev: ", stdDev)
                info.Status = "fieldworker"
                captureInfo = append(captureInfo, info)
                identity = append(identity, db.Identity{
                    CommunityID:    communityID,
                    DocumentNumber: info.DocumentNumber,
                    LabelId:        3,
                    ExpireTime:     GetCurrentDateAddDaysTimestamp(validDays)})
                continue
            }
            captureInfo = append(captureInfo, info)
        }
    }
    //addressLinearData := []
    return identity, captureInfo
}
func totalDirection(personInfos []db.CaptureInfo) {
    // 统计每个地址的进出情况
    addrData := make(map[string]map[string]int) // 使用map[string]map[string]int来存储每个地址的进出情况
    for _, info := range personInfos {
        if info.Status == "resident" {
            for _, addr := range info.CaptureDetail {
                if !strings.Contains(addr.CaptureAddress, "F") {
                    continue
                }
                // 检查该地址是否已经在addrData中
                if _, ok := addrData[addr.CaptureAddress]; !ok {
                    addrData[addr.CaptureAddress] = make(map[string]int)
                }
                // 更新该地址的进出情况
                addrData[addr.CaptureAddress][addr.Direction]++
            }
        }
    }
    fmt.Println(addrData)
}
func CreateProcessModel(personInfos []db.CaptureInfo, days int) {
    timeTOIndex := getIndexForTime(24 * 60)
    dateTOIndex := getIndexForDate(days)
    //fmt.Println(dateTOIndex)
    inModelMatrix := make([][]int, days)
    for i := range inModelMatrix {
        inModelMatrix[i] = make([]int, 24*60)
    }
    outModelMatrix := make([][]int, days)
    for i := range outModelMatrix {
        outModelMatrix[i] = make([]int, 24*60)
    }
    for _, info := range personInfos {
        //fmt.Println("info: ", info)
        for _, captureInfo := range info.CaptureDetail {
            captrueDateTime := captureInfo.CaptureDate
            dateTimeObj, err := time.Parse("2006-01-02 15:04:05", captrueDateTime)
            if err != nil {
                logger.Error("Parse time error", err)
                return
            }
            if isWeekend(dateTimeObj) {
                continue
            }
            datePart := dateTimeObj.Format("2006-01-02")
            timePart := dateTimeObj.Format("15:04")
            switch captureInfo.Direction {
            case "out":
                outModelMatrix[dateTOIndex[datePart]][timeTOIndex[timePart]] = 1
            case "in":
                inModelMatrix[dateTOIndex[datePart]][timeTOIndex[timePart]] = 1
            default:
                continue
            }
        }
    }
    fmt.Println(outModelMatrix)
    fmt.Println(inModelMatrix)
}
// 计算一阶导数
func firstDerivative(data []float64) []float64 {
    n := len(data)
    derivative := make([]float64, n)
    for i := 0; i < n-1; i++ {
        derivative[i] = data[i+1] - data[i]
    }
    return derivative
}
// 计算导数的标准差
func stdDeviation(data []float64) float64 {
    n := len(data)
    mean := 0.0
    // 计算平均值
    for _, value := range data {
        mean += value
    }
    mean /= float64(n)
    // 计算方差
    sum := 0.0
    for _, value := range data {
        sum += math.Pow(value-mean, 2)
    }
    variance := sum / float64(n)
    return math.Sqrt(variance)
}
// 将 map[string]int 转换为 []float64
func mapIntToFloatSlice(addrData map[string]int) []float64 {
    var data []float64
    // 遍历 addrData,将其值转换为 float64 类型后添加到 data 中
    for _, v := range addrData {
        data = append(data, float64(v))
    }
    return data
}
// GetCurrentDateAddDaysTimestamp 返回当前日期加上指定天数后的时间戳(秒)
func GetCurrentDateAddDaysTimestamp(days int) int64 {
    // 获取当前时间
    now := time.Now()
    // 只保留年月日部分
    currentYear, currentMonth, currentDay := now.Date()
    // 构建当天零点时间
    zeroTime := time.Date(currentYear, currentMonth, currentDay, 0, 0, 0, 0, now.Location())
    // 将天数转换为Duration类型
    duration := time.Duration(days) * 24 * time.Hour
    // 计算指定天数后的时间
    futureTime := zeroTime.Add(duration)
    // 返回时间戳(秒)
    return futureTime.Unix()
}
// 减去指定天数后的日期
func subtractDays(days int) time.Time {
    // 获取当前时间
    now := time.Now()
    // 减去指定天数
    previousDays := now.AddDate(0, 0, -days)
    return previousDays
}
// 判断给定日期是否是周末
func isWeekend(date time.Time) bool {
    dayOfWeek := date.Weekday()
    return dayOfWeek == time.Saturday || dayOfWeek == time.Sunday
}
// 获取个日期内总天数对应的索引
func getIndexForDate(days int) map[string]int {
    startDate := subtractDays(days)
    endDate := subtractDays(1)
    arraySize := days
    layout := "2006-01-02"
    dateTOIndex := make(map[string]int)
    for i := 0; i <= int(endDate.Sub(startDate).Hours()/24); i++ {
        date := startDate.AddDate(0, 0, i)
        offsetDays := int(date.Sub(startDate).Hours() / 24)
        arrayIndex := offsetDays % arraySize
        dateTOIndex[date.Format(layout)] = arrayIndex
    }
    return dateTOIndex
}
// 获取一天中每分钟的索引位置映射
func getIndexForTime(arraySize int) map[string]int {
    timeTOIndex := make(map[string]int)
    for i := 0; i < arraySize; i++ {
        hour := i / 60
        minute := i % 60
        timeStr := fmt.Sprintf("%02d:%02d", hour, minute)
        timeTOIndex[timeStr] = i
    }
    return timeTOIndex
}
rule/service.go
New file
@@ -0,0 +1,147 @@
package rule
import (
    "basic.com/valib/logger.git"
    "fmt"
    "ruleModelEngine/data"
    "ruleModelEngine/db"
)
//func PrintFilteredPersonnelInfo(filteredPersonnelInfo []db.PersonnelInfo) {
//    for _, pi := range filteredPersonnelInfo {
//        fmt.Printf("DocumentNumber: %s\n", pi.DocumentNumber)
//        fmt.Printf("TotalCaptureCount: %d\n", pi.TotalCaptureCount)
//        fmt.Printf("TotalCaptureDays: %d\n", pi.TotalCaptureDays)
//        fmt.Printf("PersonnelStatus: %s\n", pi.PersonnelStatus)
//
//        fmt.Println("CaptureDetails:")
//        for _, cd := range pi.CaptureDetails {
//            fmt.Printf("  Date: %s\n", cd.Date)
//            fmt.Printf("  TotalCaptureCount: %d\n", cd.TotalCaptureCount)
//            fmt.Println("  Captures:")
//            for _, c := range cd.Captures {
//                fmt.Printf("    DataTime: %s\n", c.DataTime)
//                fmt.Printf("    Location: %s\n", c.Location)
//            }
//        }
//        fmt.Println()
//    }
//}
// 计算常用地址
//func assignFrequentAddress(personnelInfo []db.PersonnelInfo) []db.PersonnelInfo {
//    for i := range personnelInfo {
//        addressCounts := make(map[string]int)
//        var maxAddress string
//        maxCount := 0
//        for _, detail := range personnelInfo[i].CaptureDetails {
//            for _, capture := range detail.Captures {
//                addressCounts[capture.Location]++
//                if addressCounts[capture.Location] > maxCount {
//                    maxCount = addressCounts[capture.Location]
//                    maxAddress = capture.Location
//                }
//            }
//        }
//        personnelInfo[i].FrequentAddress = maxAddress
//    }
//    return personnelInfo
//}
// 检查规则表书否存在异常
func checkRuleValidity(rules []db.PersonnelStatusRule) bool {
    for i := 0; i < len(rules); i++ {
        for j := i + 1; j < len(rules); j++ {
            ruleI := rules[i]
            ruleJ := rules[j]
            //fmt.Println(ruleI.DetectionDaysStart,ruleI.DetectionDaysEnd)
            //fmt.Println(ruleJ.DetectionDaysStart,ruleJ.DetectionDaysEnd)
            if (ruleI.DetectionDaysStart <= ruleJ.DetectionDaysEnd && ruleI.DetectionDaysStart >= ruleJ.DetectionDaysStart) ||
                (ruleI.DetectionDaysEnd <= ruleJ.DetectionDaysEnd && ruleI.DetectionDaysEnd >= ruleJ.DetectionDaysStart) ||
                (ruleI.DetectionDaysStart <= ruleJ.DetectionDaysStart && ruleI.DetectionDaysEnd >= ruleJ.DetectionDaysEnd) {
                return false
            }
        }
    }
    return true
}
// 执行程序入口
func ExecuteTask() {
    ruleInfo, err := db.GetAllData()
    if err != nil {
        logger.Error("GetAllData Error", err)
    }
    fmt.Println("ruleInfo: ", ruleInfo)
    communityIDs, err := db.GetCommunityIDs()
    //fmt.Println("communityIDs:", communityIDs)
    if err != nil {
        logger.Error("GetCommunityIDs Error", err)
    }
    for _, communityID := range communityIDs {
        //查询社区内人员档案,方便数据更新
        personStatus, err := db.GetDBPersonStatusData(communityID)
        if err != nil {
            logger.Error("GetDBPersonStatusData Error", err)
        }
        labeManage, err := db.GetLabelManageIdentity(2)
        if err != nil {
            logger.Error("GetDBPersonStatusData Error", err)
        }
        //fmt.Println(labeManage)
        //fmt.Println("personStatus: ", personStatus)
        //fmt.Println("CcmmunityIDs: ", cmmunityID)
        //按社区id查询近一个月es数据
        captureInfos, err := db.Query1MDataByCommunityId(communityID)
        //fmt.Println("captureInfos: ", captureInfos)
        for i := range captureInfos {
            captureDays := data.CalculateCaptureDays(captureInfos[i].CaptureDetail)
            captureInfos[i].CaptureDays = captureDays
            //fmt.Println("该人员出现天数为", captureInfos[i].CaptureDays)
            captureInfos[i].Status = data.SetStatus(captureDays, ruleInfo)
            data.SetFrequentAddress(&captureInfos[i])
        }
        if err != nil {
            logger.Error("MatchAllTargets Error", err)
        }
        if len(captureInfos) == 0 {
            continue
        }
        //fmt.Println("captureInfosQ: ", captureInfos)
        for _, identity := range labeManage {
            switch identity.Name {
            case "服务人员":
                identity, attribute := CreateLinearModel(captureInfos, communityID, 2.68, identity.ValidDays)
                errIdentity := db.UpdateDBPersonLabel(identity)
                if errIdentity != nil {
                    logger.Error("UpdateDBPersonLabel Error", errIdentity)
                }
                captureInfos = attribute
            }
        }
        //CreateProcessModel(captureInfos, 30)
        //continue
        //fmt.Println("captureInfos: ", captureInfos)
        postCaptureInfos := data.ProcessData(captureInfos, personStatus, ruleInfo, communityID)
        //fmt.Println("postCaptureInfos: ", postCaptureInfos)
        //fmt.Println("共过滤条数:", len(captureInfos)-len(postCaptureInfos))
        UpdatePersonInfoErr := db.UpdatePersonInfo(postCaptureInfos, communityID)
        if UpdatePersonInfoErr != nil {
            logger.Error("MatchPermanentResidentTargets Error: ", UpdatePersonInfoErr)
        }
        resident, DocNumberErr := db.GetResidentData("resident", communityID)
        if DocNumberErr != nil {
            logger.Error("GetDocNumberFromPersonStatus Error: ", DocNumberErr)
        }
        //fmt.Println(resident)
        mio := processResidentStatus(resident)
        //fmt.Println("mip: ", mio)
        UpdateMoveInoutErr := db.UpdateMoveInout(mio)
        if UpdateMoveInoutErr != nil {
            logger.Error("UpdateMoveInoutErr Error: ", UpdateMoveInoutErr)
        }
    }
}
util/util.go
New file
@@ -0,0 +1,285 @@
package util
import (
    "basic.com/valib/logger.git"
    "bytes"
    "encoding/json"
    "errors"
    "fmt"
    "math"
    "os/exec"
    "strconv"
    "time"
)
// 脚本封装
func RunScript(str string) string {
    cmd := exec.Command("sh", "-c", str)
    var out bytes.Buffer
    cmd.Stdout = &out
    err := cmd.Run()
    if err != nil {
        return "运行失败"
    }
    return out.String()
}
func Sourcelist(buf []byte) (sources []map[string]interface{}, err error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return nil, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["hits"].(map[string]interface{})
    if !ok {
        return nil, errors.New("first hits change error!")
    }
    for _, in := range middle["hits"].([]interface{}) {
        tmpbuf, ok := in.(map[string]interface{})
        if !ok {
            fmt.Println("change to source error!")
            continue
        }
        source, ok := tmpbuf["_source"].(map[string]interface{})
        if !ok {
            fmt.Println("change _source error!")
            continue
        }
        sources = append(sources, source)
    }
    return sources, nil
}
func SourceTotal(buf []byte) (total int, err error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return -1, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["hits"].(map[string]interface{})
    if !ok {
        return -1, errors.New("first total change error!")
    }
    tmp, b := middle["total"].(map[string]interface{})
    if b != true {
        v := middle["total"].(float64)
        t := int(v)
        return t, nil
    }
    value := tmp["value"].(float64)
    total = int(value)
    return total, nil
}
func SourceCreated(buf []byte) (result bool, err error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return false, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["result"].(string)
    if !ok {
        return false, errors.New("first total change error!")
    }
    if middle == "created" || middle == "updated" {
        result = true
    }
    return result, nil
}
func SourceDeleted(buf []byte) (total int, err error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return -1, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["deleted"].(float64)
    if !ok {
        return -1, errors.New("first total change error!")
    }
    total = int(middle)
    return total, nil
}
func SourceUpdated(buf []byte) (total int, err error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return -1, errors.New("http response interface can not change map[string]interface{}")
    }
    //fmt.Println(out)
    middle, ok := out["updated"].(float64)
    if !ok {
        return -1, errors.New("first total change error!")
    }
    total = int(middle)
    return total, nil
}
func SourceAggregationList(buf []byte) (sources []map[string]interface{}, err error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return nil, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["aggregations"].(map[string]interface{})
    if !ok {
        return nil, errors.New("first hits change error!")
    }
    documentAggregations := middle["group_by_documentnumber"].(map[string]interface{})
    buckets := documentAggregations["buckets"].([]interface{})
    if len(buckets) == 0 {
        return nil, nil
    }
    for _, in := range buckets {
        tmpbuf, ok := in.(map[string]interface{})
        if !ok {
            return nil, errors.New("")
        }
        sources = append(sources, tmpbuf)
    }
    return sources, nil
}
func StartTimer(f func()) {
    for {
        f()
        now := time.Now()
        // 计算下一个零点
        next := now.Add(time.Hour * 24)
        next = time.Date(next.Year(), next.Month(), next.Day(), 0, 0, 0, 0, next.Location())
        t := time.NewTimer(next.Sub(now))
        <-t.C
        t.Stop()
    }
}
// 日期换算工具
func GetTimeArr(earliestDate string) int {
    now := time.Now().Format("2006-01-02")
    fmt.Println(earliestDate, now)
    a, _ := time.Parse("2006-01-02", earliestDate)
    b, _ := time.Parse("2006-01-02", now)
    d := a.Sub(b)
    date := d.Hours() / 24
    return int(math.Abs(date))
}
// 日期遍历工具
// GetBetweenDates 根据开始日期和结束日期计算出时间段内所有日期
// 参数为日期格式,如:2020-01-01
func GetBetweenDates(sdate, edate string) []string {
    d := []string{}
    timeFormatTpl := "2006-01-02"
    if len(timeFormatTpl) != len(sdate) {
        timeFormatTpl = timeFormatTpl[0:len(sdate)]
    }
    date, err := time.Parse(timeFormatTpl, sdate)
    if err != nil {
        // 时间解析,异常
        return d
    }
    date2, err := time.Parse(timeFormatTpl, edate)
    if err != nil {
        // 时间解析,异常
        return d
    }
    if date2.Before(date) {
        // 如果结束时间小于开始时间,异常
        return d
    }
    // 输出日期格式固定
    timeFormatTpl = "2006-01-02"
    date2Str := date2.Format(timeFormatTpl)
    d = append(d, date.Format(timeFormatTpl))
    for {
        date = date.AddDate(0, 0, 1)
        dateStr := date.Format(timeFormatTpl)
        d = append(d, dateStr)
        if dateStr == date2Str {
            break
        }
    }
    return d
}
func IpIntToString(ipInt int) string {
    ipSegs := make([]string, 4)
    var len int = len(ipSegs)
    buffer := bytes.NewBufferString("")
    for i := 0; i < len; i++ {
        tempInt := ipInt & 0xFF
        ipSegs[len-i-1] = strconv.Itoa(tempInt)
        ipInt = ipInt >> 8
    }
    for i := 0; i < len; i++ {
        buffer.WriteString(ipSegs[i])
        if i < len-1 {
            buffer.WriteString(".")
        }
    }
    return buffer.String()
}
// 计算时间阈值
func CheckTimeDifference(timestampStr1 string, timestampStr2 string, intervalInMinutes int) bool {
    layout := "2006-01-02 15:04:05"
    timestampStr1 = timestampStr1[:19]
    timestampStr2 = timestampStr2[:19]
    // 将字符串解析为时间
    time1, err := time.Parse(layout, timestampStr1)
    if err != nil {
        fmt.Println("时间解析失败:", err)
        return false
    }
    time2, err := time.Parse(layout, timestampStr2)
    if err != nil {
        fmt.Println("时间解析失败:", err)
        return false
    }
    // 计算时间差
    diff := time2.Sub(time1)
    // 检查时间差是否小于等于指定的间隔
    if diff.Minutes() <= float64(intervalInMinutes) {
        return true
    } else {
        return false
    }
}
func CalculateDays(startDate string, endDate string) int {
    layout := "2006-01-02" // 日期格式
    startTime, err := time.Parse(layout, startDate)
    if err != nil {
        // 错误处理
        logger.Error("Error parsing start date:", err)
        return 0
    }
    endTime, err := time.Parse(layout, endDate)
    if err != nil {
        // 错误处理
        logger.Error("Error parsing end date:", err)
        return 0
    }
    duration := endTime.Sub(startTime)
    days := int(duration.Hours() / 24) // 将时间差转换为天数
    return days
}