| | |
| | | "bytes" |
| | | "context" |
| | | "encoding/json" |
| | | "errors" |
| | | "fmt" |
| | | "github.com/elastic/go-elasticsearch/v6" |
| | | "log" |
| | | "strings" |
| | | "sync" |
| | | "time" |
| | | |
| | | "github.com/elastic/go-elasticsearch/v6" |
| | | |
| | | "model-engine/config" |
| | | "model-engine/db" |
| | | "model-engine/pkg/logger" |
| | | "model-engine/pkg/set" |
| | | "model-engine/service" |
| | | "strings" |
| | | "time" |
| | | ) |
| | | |
| | | type GatherModel struct { |
| | | OrgIds []interface{} `json:"-"` |
| | | AreaIds []interface{} `json:"-"` |
| | | Building string `gorm:"type:varchar(255)" json:"building"` //楼栋 |
| | | Floor string `gorm:"type:varchar(255)" json:"floor"` //楼层 |
| | | AlarmType db.AlarmType `gorm:"type:varchar(255);" json:"alarmType"` //预警方式 |
| | | PersonType string `gorm:"type:varchar(255);" json:"personType"` //人员类型 |
| | | GatherPersons int `gorm:"type:int;" json:"gatherPersons"` //聚集人数 |
| | | AppearInterval int `gorm:"type:int;" json:"appearInterval"` //出现间隔,单位为秒 |
| | | DaysWindow int `gorm:"type:int;" json:"daysWindow" ` //近几天内 |
| | | Threshold int `gorm:"type:int;" json:"threshold" ` //达几次 |
| | | Building string `gorm:"type:varchar(255)" json:"building"` // 楼栋 |
| | | Floor string `gorm:"type:varchar(255)" json:"floor"` // 楼层 |
| | | AlarmType db.AlarmType `gorm:"type:varchar(255);" json:"alarmType"` // 预警方式 |
| | | PersonType string `gorm:"type:varchar(255);" json:"personType"` // 人员类型 |
| | | GatherPersons int `gorm:"type:int;" json:"gatherPersons"` // 聚集人数 |
| | | AppearInterval int `gorm:"type:int;" json:"appearInterval"` // ,单位为秒 |
| | | DaysWindow int `gorm:"type:int;" json:"daysWindow" ` // 近几天内 |
| | | Threshold int `gorm:"type:int;" json:"threshold" ` // 达几次 |
| | | Task *db.ModelTask |
| | | } |
| | | |
| | | func (m *GatherModel) Init(task *db.ModelTask) error { |
| | | type ProcessedRecord struct { |
| | | UniqueKey string // 唯一标识 |
| | | Timestamp time.Time // 记录的时间戳 |
| | | } |
| | | |
| | | func (m *GatherModel) Init(task *db.ModelTask) error { |
| | | if len(task.DomainUnitIds) == 0 { |
| | | return errors.New("empty domain set") |
| | | } |
| | | orgIds, areaIds, err := service.GetOrgIdsAndAreaIdsByDomainUnitIds(task.DomainUnitIds) |
| | | if err != nil { |
| | | return err |
| | |
| | | m.Floor = task.Floor |
| | | m.AlarmType = task.AlarmType |
| | | m.PersonType = task.PersonType |
| | | m.GatherPersons = task.GatherPersons |
| | | m.AppearInterval = task.AppearInterval |
| | | m.DaysWindow = task.DaysWindow |
| | | m.Threshold = task.Threshold |
| | | fmt.Println("GatherModel init finish ...") |
| | | |
| | | for _, v := range task.Rules { |
| | | if v.Alias == "gatherPersons" { |
| | | if val, ok := v.Value.(float64); ok { |
| | | m.GatherPersons = int(val) |
| | | } |
| | | } |
| | | |
| | | if v.Alias == "appearInterval" { |
| | | if val, ok := v.Value.(float64); ok { |
| | | m.AppearInterval = int(val) |
| | | } |
| | | } |
| | | |
| | | if v.Alias == "daysWindow" { |
| | | if val, ok := v.Value.(float64); ok { |
| | | m.DaysWindow = int(val) |
| | | } |
| | | } |
| | | |
| | | if v.Alias == "threshold" { |
| | | if val, ok := v.Value.(float64); ok { |
| | | m.Threshold = int(val) |
| | | } |
| | | } |
| | | } |
| | | |
| | | logger.Debugf("GatherModel init finish ...task id:%s, name:%s, rule:%+v\n", task.ID, task.Name, m) |
| | | |
| | | if m.GatherPersons == 0 || m.AppearInterval == 0 || m.DaysWindow == 0 || m.Threshold == 0 { |
| | | logger.Warnf("invalid parameters. task id:%s, name:%s\n", task.ID, task.Name) |
| | | return errors.New("invalid parameters") |
| | | } |
| | | |
| | | return nil |
| | | } |
| | | |
| | |
| | | OrgId string `json:"orgId"` |
| | | Building string `json:"building"` |
| | | Floor string `json:"floor"` |
| | | GatherPersons int `gorm:"type:int;" json:"gatherPersons"` //聚集人数 |
| | | AppearInterval int `gorm:"type:int;" json:"appearInterval"` //出现间隔,单位为秒 |
| | | GatherPersons int `gorm:"type:int;" json:"gatherPersons"` // 聚集人数 |
| | | AppearInterval int `gorm:"type:int;" json:"appearInterval"` // 出现间隔,单位为秒 |
| | | } |
| | | |
| | | var ( |
| | | processed sync.Map // 存储已处理记录 |
| | | cleanupThreshold = time.Now().Add(-100 * time.Hour) // 定义一个时间窗口,假设只保存最近100小时的记录 |
| | | ) |
| | | |
| | | func (m *GatherModel) Run() error { |
| | | // 清理过期的记录 |
| | | processed.Range(func(key, value any) bool { |
| | | if value.(ProcessedRecord).Timestamp.Before(cleanupThreshold) { |
| | | processed.Delete(key) |
| | | } |
| | | return true |
| | | }) |
| | | |
| | | records, err := queryElasticsearch(db.GetEsClient(), m) |
| | | if err != nil { |
| | | log.Fatalf("Failed to query Elasticsearch: %v", err) |
| | |
| | | return nil |
| | | } |
| | | |
| | | aggregation, err := analyzeAndAggregate(records) |
| | | newRecords := make([]*GatherRecord, 0) |
| | | |
| | | // 聚合逻辑 |
| | | for _, record := range records { |
| | | // 生成唯一标识 |
| | | uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate) |
| | | |
| | | // 如果已经处理过,跳过 |
| | | if _, exists := processed.Load(uniqueKey); exists { |
| | | continue |
| | | } |
| | | |
| | | // 添加到已处理记录 |
| | | processed.Store(uniqueKey, ProcessedRecord{ |
| | | UniqueKey: uniqueKey, |
| | | Timestamp: time.Now(), |
| | | }) |
| | | newRecords = append(newRecords, record) |
| | | } |
| | | if len(newRecords) == 0 { |
| | | return nil |
| | | } |
| | | |
| | | aggregation, err := analyzeAndAggregate(newRecords) |
| | | if err != nil { |
| | | log.Fatalf("Failed to analyze and aggregate data: %v", err) |
| | | } |
| | |
| | | return err |
| | | } |
| | | event := strings.Join(typeNames, ",") |
| | | for location, persons := range aggregation { |
| | | for lt, persons := range aggregation { |
| | | if persons.Size() == 0 { |
| | | continue |
| | | } |
| | | personIds := persons.Elements() |
| | | result := &db.ModelTaskResults{ |
| | | Title: m.Task.Name, |
| | | Event: fmt.Sprintf("%s/%d人", event, len(persons)), |
| | | ModelID: m.Task.ModelID, |
| | | ModelTaskID: m.Task.ID, |
| | | CommunityId: location.CommunityId, |
| | | OrgID: location.OrgId, |
| | | ObjectIds: strings.Join(persons.Elements(), ","), |
| | | Location: location.Location, |
| | | Building: location.Building, |
| | | Floor: location.Floor, |
| | | Title: m.Task.Name, |
| | | Event: fmt.Sprintf("%s/%d人", event, len(persons)), |
| | | ModelID: m.Task.ModelID, |
| | | ModelTaskID: m.Task.ID, |
| | | CommunityId: lt.CommunityId, |
| | | OrgID: lt.OrgId, |
| | | ObjectIds: strings.Join(personIds, ","), |
| | | Location: lt.Location, |
| | | Building: lt.Building, |
| | | Floor: lt.Floor, |
| | | PicDate: lt.Time, |
| | | FirstPersonID: personIds[0], |
| | | } |
| | | results = append(results, result) |
| | | } |
| | | return service.SaveTaskResults(results) |
| | | } |
| | | |
| | | func (m *GatherModel) KeepAlive() error { |
| | | db.GetDB().Model(m.Task).Where("id = ?", m.Task.ID).Update("last_run_time", time.Now()) |
| | | return nil |
| | | } |
| | | |
| | | func (m *GatherModel) Shutdown() error { |
| | |
| | | return nil |
| | | } |
| | | |
| | | func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]GatherRecord, error) { |
| | | func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]*GatherRecord, error) { |
| | | var buf bytes.Buffer |
| | | now := time.Now() |
| | | start := now.Add(-time.Duration(gatherModel.DaysWindow) * 24 * time.Hour) |
| | |
| | | } |
| | | |
| | | // 解析聚合结果 |
| | | var records []GatherRecord |
| | | var records []*GatherRecord |
| | | if aggs, ok := result["aggregations"].(map[string]interface{}); ok { |
| | | if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok { |
| | | for _, orgBucket := range orgBuckets { |
| | |
| | | if gatherEvents, ok := floorBucket.(map[string]interface{})["gather_events"].(map[string]interface{})["buckets"].([]interface{}); ok { |
| | | for _, eventBucket := range gatherEvents { |
| | | key := int64(eventBucket.(map[string]interface{})["key"].(float64)) / 1000 // 将毫秒转换为秒 |
| | | timestamp := time.Unix(key, 0).Format("2006-01-02 15:04:05") |
| | | timestamp := time.Unix(key, 0).UTC().Format("2006-01-02 15:04:05") |
| | | |
| | | // 解析人员 |
| | | if peopleBuckets, ok := eventBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok { |
| | |
| | | documentNumber := person.(map[string]interface{})["key"].(string) |
| | | |
| | | // 构建 GatherRecord 结构体 |
| | | record := GatherRecord{ |
| | | record := &GatherRecord{ |
| | | PicDate: timestamp, |
| | | DocumentNumber: documentNumber, |
| | | CommunityId: communityId, |
| | |
| | | return records, nil |
| | | } |
| | | |
| | | type GatherLocation struct { |
| | | type GatherLocationTime struct { |
| | | CommunityId string |
| | | OrgId string |
| | | Building string |
| | | Floor string |
| | | Location string |
| | | Time string |
| | | } |
| | | |
| | | func analyzeAndAggregate(records []GatherRecord) (map[GatherLocation]set.StringSet, error) { |
| | | aggregation := make(map[GatherLocation]set.StringSet) |
| | | func analyzeAndAggregate(records []*GatherRecord) (map[GatherLocationTime]set.StringSet, error) { |
| | | if len(records) == 0 { |
| | | return nil, nil |
| | | } |
| | | aggregation := make(map[GatherLocationTime]set.StringSet) |
| | | domainIds := set.NewStringSet() |
| | | for _, record := range records { |
| | | domainIds.Add(record.CommunityId) |
| | |
| | | } |
| | | |
| | | for _, record := range records { |
| | | if record.DocumentNumber == "" { |
| | | continue |
| | | } |
| | | if domains[record.CommunityId] == nil { |
| | | continue |
| | | } |
| | | |
| | | location := GatherLocation{ |
| | | location := GatherLocationTime{ |
| | | CommunityId: record.CommunityId, |
| | | OrgId: record.OrgId, |
| | | Building: record.Building, |
| | | Floor: record.Floor, |
| | | Location: fmt.Sprintf("%s%s%s", domains[record.CommunityId].Name, record.Building, record.Floor), |
| | | Time: record.PicDate, |
| | | } |
| | | if aggregation[location] == nil { |
| | | aggregation[location] = set.NewStringSet() |