zhaoqingang
2025-02-08 3c033759200ad7c02dd59521b1aebbbdc35b98fa
models/gather_model.go
@@ -6,27 +6,31 @@
   "encoding/json"
   "errors"
   "fmt"
   "github.com/elastic/go-elasticsearch/v6"
   "log"
   "strings"
   "sync"
   "time"
   "github.com/elastic/go-elasticsearch/v6"
   "model-engine/config"
   "model-engine/db"
   "model-engine/pkg/logger"
   "model-engine/pkg/set"
   "model-engine/service"
   "strings"
   "time"
)
type GatherModel struct {
   OrgIds         []interface{} `json:"-"`
   AreaIds        []interface{} `json:"-"`
   Building       string        `gorm:"type:varchar(255)" json:"building"`    //楼栋
   Floor          string        `gorm:"type:varchar(255)" json:"floor"`       //楼层
   AlarmType      db.AlarmType  `gorm:"type:varchar(255);" json:"alarmType"`  //预警方式
   PersonType     string        `gorm:"type:varchar(255);" json:"personType"` //人员类型
   GatherPersons  int           `gorm:"type:int;" json:"gatherPersons"`       //聚集人数
   AppearInterval int           `gorm:"type:int;" json:"appearInterval"`      //出现间隔,单位为秒
   DaysWindow     int           `gorm:"type:int;" json:"daysWindow" `         //近几天内
   Threshold      int           `gorm:"type:int;" json:"threshold" `          //达几次
   Building       string        `gorm:"type:varchar(255)" json:"building"`    // 楼栋
   Floor          string        `gorm:"type:varchar(255)" json:"floor"`       // 楼层
   AlarmType      db.AlarmType  `gorm:"type:varchar(255);" json:"alarmType"`  // 预警方式
   PersonType     string        `gorm:"type:varchar(255);" json:"personType"` // 人员类型
   GatherPersons  int           `gorm:"type:int;" json:"gatherPersons"`       // 聚集人数
   AppearInterval int           `gorm:"type:int;" json:"appearInterval"`      // ,单位为秒
   DaysWindow     int           `gorm:"type:int;" json:"daysWindow" `         // 近几天内
   Threshold      int           `gorm:"type:int;" json:"threshold" `          // 达几次
   Task           *db.ModelTask
}
@@ -51,11 +55,40 @@
   m.Floor = task.Floor
   m.AlarmType = task.AlarmType
   m.PersonType = task.PersonType
   m.GatherPersons = task.GatherPersons
   m.AppearInterval = task.AppearInterval
   m.DaysWindow = task.DaysWindow
   m.Threshold = task.Threshold
   fmt.Println("GatherModel init finish ...")
   for _, v := range task.Rules {
      if v.Alias == "gatherPersons" {
         if val, ok := v.Value.(float64); ok {
            m.GatherPersons = int(val)
         }
      }
      if v.Alias == "appearInterval" {
         if val, ok := v.Value.(float64); ok {
            m.AppearInterval = int(val)
         }
      }
      if v.Alias == "daysWindow" {
         if val, ok := v.Value.(float64); ok {
            m.DaysWindow = int(val)
         }
      }
      if v.Alias == "threshold" {
         if val, ok := v.Value.(float64); ok {
            m.Threshold = int(val)
         }
      }
   }
   logger.Debugf("GatherModel init finish ...task id:%s, name:%s, rule:%+v\n", task.ID, task.Name, m)
   if m.GatherPersons == 0 || m.AppearInterval == 0 || m.DaysWindow == 0 || m.Threshold == 0 {
      logger.Warnf("invalid parameters. task id:%s, name:%s\n", task.ID, task.Name)
      return errors.New("invalid parameters")
   }
   return nil
}
@@ -67,25 +100,23 @@
   OrgId          string `json:"orgId"`
   Building       string `json:"building"`
   Floor          string `json:"floor"`
   GatherPersons  int    `gorm:"type:int;" json:"gatherPersons"`  //聚集人数
   AppearInterval int    `gorm:"type:int;" json:"appearInterval"` //出现间隔,单位为秒
   GatherPersons  int    `gorm:"type:int;" json:"gatherPersons"`  // 聚集人数
   AppearInterval int    `gorm:"type:int;" json:"appearInterval"` // 出现间隔,单位为秒
}
var (
   processed        map[string]ProcessedRecord         // 存储已处理记录
   processed        sync.Map                           // 存储已处理记录
   cleanupThreshold = time.Now().Add(-100 * time.Hour) // 定义一个时间窗口,假设只保存最近100小时的记录
)
func init() {
   processed = make(map[string]ProcessedRecord)
}
func (m *GatherModel) Run() error {
   // 清理过期的记录
   for key, record := range processed {
      if record.Timestamp.Before(cleanupThreshold) {
         delete(processed, key)
   processed.Range(func(key, value any) bool {
      if value.(ProcessedRecord).Timestamp.Before(cleanupThreshold) {
         processed.Delete(key)
      }
   }
      return true
   })
   records, err := queryElasticsearch(db.GetEsClient(), m)
   if err != nil {
@@ -104,16 +135,19 @@
      uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate)
      // 如果已经处理过,跳过
      if _, exists := processed[uniqueKey]; exists {
      if _, exists := processed.Load(uniqueKey); exists {
         continue
      }
      // 添加到已处理记录
      processed[uniqueKey] = ProcessedRecord{
      processed.Store(uniqueKey, ProcessedRecord{
         UniqueKey: uniqueKey,
         Timestamp: time.Now(),
      }
      })
      newRecords = append(newRecords, record)
   }
   if len(newRecords) == 0 {
      return nil
   }
   aggregation, err := analyzeAndAggregate(newRecords)
@@ -154,6 +188,11 @@
      results = append(results, result)
   }
   return service.SaveTaskResults(results)
}
func (m *GatherModel) KeepAlive() error {
   db.GetDB().Model(m.Task).Where("id = ?", m.Task.ID).Update("last_run_time", time.Now())
   return nil
}
func (m *GatherModel) Shutdown() error {
@@ -403,6 +442,9 @@
}
func analyzeAndAggregate(records []*GatherRecord) (map[GatherLocationTime]set.StringSet, error) {
   if len(records) == 0 {
      return nil, nil
   }
   aggregation := make(map[GatherLocationTime]set.StringSet)
   domainIds := set.NewStringSet()
   for _, record := range records {