zhaoqingang
2025-02-08 3c033759200ad7c02dd59521b1aebbbdc35b98fa
models/gather_model.go
@@ -4,33 +4,45 @@
   "bytes"
   "context"
   "encoding/json"
   "errors"
   "fmt"
   "github.com/elastic/go-elasticsearch/v6"
   "log"
   "strings"
   "sync"
   "time"
   "github.com/elastic/go-elasticsearch/v6"
   "model-engine/config"
   "model-engine/db"
   "model-engine/pkg/logger"
   "model-engine/pkg/set"
   "model-engine/service"
   "strings"
   "time"
)
type GatherModel struct {
   OrgIds         []interface{} `json:"-"`
   AreaIds        []interface{} `json:"-"`
   Building       string        `gorm:"type:varchar(255)" json:"building"`    //楼栋
   Floor          string        `gorm:"type:varchar(255)" json:"floor"`       //楼层
   AlarmType      db.AlarmType  `gorm:"type:varchar(255);" json:"alarmType"`  //预警方式
   PersonType     string        `gorm:"type:varchar(255);" json:"personType"` //人员类型
   GatherPersons  int           `gorm:"type:int;" json:"gatherPersons"`       //聚集人数
   AppearInterval int           `gorm:"type:int;" json:"appearInterval"`      //出现间隔,单位为秒
   DaysWindow     int           `gorm:"type:int;" json:"daysWindow" `         //近几天内
   Threshold      int           `gorm:"type:int;" json:"threshold" `          //达几次
   Building       string        `gorm:"type:varchar(255)" json:"building"`    // 楼栋
   Floor          string        `gorm:"type:varchar(255)" json:"floor"`       // 楼层
   AlarmType      db.AlarmType  `gorm:"type:varchar(255);" json:"alarmType"`  // 预警方式
   PersonType     string        `gorm:"type:varchar(255);" json:"personType"` // 人员类型
   GatherPersons  int           `gorm:"type:int;" json:"gatherPersons"`       // 聚集人数
   AppearInterval int           `gorm:"type:int;" json:"appearInterval"`      // ,单位为秒
   DaysWindow     int           `gorm:"type:int;" json:"daysWindow" `         // 近几天内
   Threshold      int           `gorm:"type:int;" json:"threshold" `          // 达几次
   Task           *db.ModelTask
}
func (m *GatherModel) Init(task *db.ModelTask) error {
type ProcessedRecord struct {
   UniqueKey string    // 唯一标识
   Timestamp time.Time // 记录的时间戳
}
func (m *GatherModel) Init(task *db.ModelTask) error {
   if len(task.DomainUnitIds) == 0 {
      return errors.New("empty domain set")
   }
   orgIds, areaIds, err := service.GetOrgIdsAndAreaIdsByDomainUnitIds(task.DomainUnitIds)
   if err != nil {
      return err
@@ -43,11 +55,40 @@
   m.Floor = task.Floor
   m.AlarmType = task.AlarmType
   m.PersonType = task.PersonType
   m.GatherPersons = task.GatherPersons
   m.AppearInterval = task.AppearInterval
   m.DaysWindow = task.DaysWindow
   m.Threshold = task.Threshold
   fmt.Println("GatherModel init finish ...")
   for _, v := range task.Rules {
      if v.Alias == "gatherPersons" {
         if val, ok := v.Value.(float64); ok {
            m.GatherPersons = int(val)
         }
      }
      if v.Alias == "appearInterval" {
         if val, ok := v.Value.(float64); ok {
            m.AppearInterval = int(val)
         }
      }
      if v.Alias == "daysWindow" {
         if val, ok := v.Value.(float64); ok {
            m.DaysWindow = int(val)
         }
      }
      if v.Alias == "threshold" {
         if val, ok := v.Value.(float64); ok {
            m.Threshold = int(val)
         }
      }
   }
   logger.Debugf("GatherModel init finish ...task id:%s, name:%s, rule:%+v\n", task.ID, task.Name, m)
   if m.GatherPersons == 0 || m.AppearInterval == 0 || m.DaysWindow == 0 || m.Threshold == 0 {
      logger.Warnf("invalid parameters. task id:%s, name:%s\n", task.ID, task.Name)
      return errors.New("invalid parameters")
   }
   return nil
}
@@ -59,11 +100,24 @@
   OrgId          string `json:"orgId"`
   Building       string `json:"building"`
   Floor          string `json:"floor"`
   GatherPersons  int    `gorm:"type:int;" json:"gatherPersons"`  //聚集人数
   AppearInterval int    `gorm:"type:int;" json:"appearInterval"` //出现间隔,单位为秒
   GatherPersons  int    `gorm:"type:int;" json:"gatherPersons"`  // 聚集人数
   AppearInterval int    `gorm:"type:int;" json:"appearInterval"` // 出现间隔,单位为秒
}
var (
   processed        sync.Map                           // 存储已处理记录
   cleanupThreshold = time.Now().Add(-100 * time.Hour) // 定义一个时间窗口,假设只保存最近100小时的记录
)
func (m *GatherModel) Run() error {
   // 清理过期的记录
   processed.Range(func(key, value any) bool {
      if value.(ProcessedRecord).Timestamp.Before(cleanupThreshold) {
         processed.Delete(key)
      }
      return true
   })
   records, err := queryElasticsearch(db.GetEsClient(), m)
   if err != nil {
      log.Fatalf("Failed to query Elasticsearch: %v", err)
@@ -73,7 +127,30 @@
      return nil
   }
   aggregation, err := analyzeAndAggregate(records)
   newRecords := make([]*GatherRecord, 0)
   // 聚合逻辑
   for _, record := range records {
      // 生成唯一标识
      uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate)
      // 如果已经处理过,跳过
      if _, exists := processed.Load(uniqueKey); exists {
         continue
      }
      // 添加到已处理记录
      processed.Store(uniqueKey, ProcessedRecord{
         UniqueKey: uniqueKey,
         Timestamp: time.Now(),
      })
      newRecords = append(newRecords, record)
   }
   if len(newRecords) == 0 {
      return nil
   }
   aggregation, err := analyzeAndAggregate(newRecords)
   if err != nil {
      log.Fatalf("Failed to analyze and aggregate data: %v", err)
   }
@@ -89,22 +166,33 @@
      return err
   }
   event := strings.Join(typeNames, ",")
   for location, persons := range aggregation {
   for lt, persons := range aggregation {
      if persons.Size() == 0 {
         continue
      }
      personIds := persons.Elements()
      result := &db.ModelTaskResults{
         Title:       m.Task.Name,
         Event:       fmt.Sprintf("%s/%d人", event, len(persons)),
         ModelID:     m.Task.ModelID,
         ModelTaskID: m.Task.ID,
         CommunityId: location.CommunityId,
         OrgID:       location.OrgId,
         ObjectIds:   strings.Join(persons.Elements(), ","),
         Location:    location.Location,
         Building:    location.Building,
         Floor:       location.Floor,
         Title:         m.Task.Name,
         Event:         fmt.Sprintf("%s/%d人", event, len(persons)),
         ModelID:       m.Task.ModelID,
         ModelTaskID:   m.Task.ID,
         CommunityId:   lt.CommunityId,
         OrgID:         lt.OrgId,
         ObjectIds:     strings.Join(personIds, ","),
         Location:      lt.Location,
         Building:      lt.Building,
         Floor:         lt.Floor,
         PicDate:       lt.Time,
         FirstPersonID: personIds[0],
      }
      results = append(results, result)
   }
   return service.SaveTaskResults(results)
}
func (m *GatherModel) KeepAlive() error {
   db.GetDB().Model(m.Task).Where("id = ?", m.Task.ID).Update("last_run_time", time.Now())
   return nil
}
func (m *GatherModel) Shutdown() error {
@@ -113,7 +201,7 @@
   return nil
}
func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]GatherRecord, error) {
func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]*GatherRecord, error) {
   var buf bytes.Buffer
   now := time.Now()
   start := now.Add(-time.Duration(gatherModel.DaysWindow) * 24 * time.Hour)
@@ -282,7 +370,7 @@
   }
   // 解析聚合结果
   var records []GatherRecord
   var records []*GatherRecord
   if aggs, ok := result["aggregations"].(map[string]interface{}); ok {
      if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok {
         for _, orgBucket := range orgBuckets {
@@ -307,7 +395,7 @@
                              if gatherEvents, ok := floorBucket.(map[string]interface{})["gather_events"].(map[string]interface{})["buckets"].([]interface{}); ok {
                                 for _, eventBucket := range gatherEvents {
                                    key := int64(eventBucket.(map[string]interface{})["key"].(float64)) / 1000 // 将毫秒转换为秒
                                    timestamp := time.Unix(key, 0).Format("2006-01-02 15:04:05")
                                    timestamp := time.Unix(key, 0).UTC().Format("2006-01-02 15:04:05")
                                    // 解析人员
                                    if peopleBuckets, ok := eventBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok {
@@ -315,7 +403,7 @@
                                          documentNumber := person.(map[string]interface{})["key"].(string)
                                          // 构建 GatherRecord 结构体
                                          record := GatherRecord{
                                          record := &GatherRecord{
                                             PicDate:        timestamp,
                                             DocumentNumber: documentNumber,
                                             CommunityId:    communityId,
@@ -344,16 +432,20 @@
   return records, nil
}
type GatherLocation struct {
type GatherLocationTime struct {
   CommunityId string
   OrgId       string
   Building    string
   Floor       string
   Location    string
   Time        string
}
func analyzeAndAggregate(records []GatherRecord) (map[GatherLocation]set.StringSet, error) {
   aggregation := make(map[GatherLocation]set.StringSet)
func analyzeAndAggregate(records []*GatherRecord) (map[GatherLocationTime]set.StringSet, error) {
   if len(records) == 0 {
      return nil, nil
   }
   aggregation := make(map[GatherLocationTime]set.StringSet)
   domainIds := set.NewStringSet()
   for _, record := range records {
      domainIds.Add(record.CommunityId)
@@ -365,16 +457,20 @@
   }
   for _, record := range records {
      if record.DocumentNumber == "" {
         continue
      }
      if domains[record.CommunityId] == nil {
         continue
      }
      location := GatherLocation{
      location := GatherLocationTime{
         CommunityId: record.CommunityId,
         OrgId:       record.OrgId,
         Building:    record.Building,
         Floor:       record.Floor,
         Location:    fmt.Sprintf("%s%s%s", domains[record.CommunityId].Name, record.Building, record.Floor),
         Time:        record.PicDate,
      }
      if aggregation[location] == nil {
         aggregation[location] = set.NewStringSet()