zhangqian
2024-12-13 6c38b30be9ff127f200ffbfe75c0dc48612f37a6
models/gather_model.go
@@ -13,6 +13,7 @@
   "model-engine/pkg/set"
   "model-engine/service"
   "strings"
   "sync"
   "time"
)
@@ -72,20 +73,18 @@
}
var (
   processed        map[string]ProcessedRecord         // 存储已处理记录
   processed        sync.Map                           // 存储已处理记录
   cleanupThreshold = time.Now().Add(-100 * time.Hour) // 定义一个时间窗口,假设只保存最近100小时的记录
)
func init() {
   processed = make(map[string]ProcessedRecord)
}
func (m *GatherModel) Run() error {
   // 清理过期的记录
   for key, record := range processed {
      if record.Timestamp.Before(cleanupThreshold) {
         delete(processed, key)
   processed.Range(func(key, value any) bool {
      if value.(ProcessedRecord).Timestamp.Before(cleanupThreshold) {
         processed.Delete(key)
      }
   }
      return true
   })
   records, err := queryElasticsearch(db.GetEsClient(), m)
   if err != nil {
@@ -104,16 +103,19 @@
      uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate)
      // 如果已经处理过,跳过
      if _, exists := processed[uniqueKey]; exists {
      if _, exists := processed.Load(uniqueKey); exists {
         continue
      }
      // 添加到已处理记录
      processed[uniqueKey] = ProcessedRecord{
      processed.Store(uniqueKey, ProcessedRecord{
         UniqueKey: uniqueKey,
         Timestamp: time.Now(),
      }
      })
      newRecords = append(newRecords, record)
   }
   if len(newRecords) == 0 {
      return nil
   }
   aggregation, err := analyzeAndAggregate(newRecords)
@@ -133,18 +135,23 @@
   }
   event := strings.Join(typeNames, ",")
   for lt, persons := range aggregation {
      if persons.Size() == 0 {
         continue
      }
      personIds := persons.Elements()
      result := &db.ModelTaskResults{
         Title:       m.Task.Name,
         Event:       fmt.Sprintf("%s/%d人", event, len(persons)),
         ModelID:     m.Task.ModelID,
         ModelTaskID: m.Task.ID,
         CommunityId: lt.CommunityId,
         OrgID:       lt.OrgId,
         ObjectIds:   strings.Join(persons.Elements(), ","),
         Location:    lt.Location,
         Building:    lt.Building,
         Floor:       lt.Floor,
         PicDate:     lt.Time,
         Title:         m.Task.Name,
         Event:         fmt.Sprintf("%s/%d人", event, len(persons)),
         ModelID:       m.Task.ModelID,
         ModelTaskID:   m.Task.ID,
         CommunityId:   lt.CommunityId,
         OrgID:         lt.OrgId,
         ObjectIds:     strings.Join(personIds, ","),
         Location:      lt.Location,
         Building:      lt.Building,
         Floor:         lt.Floor,
         PicDate:       lt.Time,
         FirstPersonID: personIds[0],
      }
      results = append(results, result)
   }
@@ -398,6 +405,9 @@
}
func analyzeAndAggregate(records []*GatherRecord) (map[GatherLocationTime]set.StringSet, error) {
   if len(records) == 0 {
      return nil, nil
   }
   aggregation := make(map[GatherLocationTime]set.StringSet)
   domainIds := set.NewStringSet()
   for _, record := range records {