zhangqian
2024-12-12 378445b97b115be44b25957ef71e01caae598593
models/gather_model.go
@@ -9,6 +9,7 @@
   "log"
   "model-engine/config"
   "model-engine/db"
   "model-engine/pkg/set"
   "model-engine/service"
   "strings"
   "time"
@@ -25,6 +26,7 @@
   AppearInterval int           `gorm:"type:int;" json:"appearInterval"`      //出现间隔,单位为秒
   DaysWindow     int           `gorm:"type:int;" json:"daysWindow" `         //近几天内
   Threshold      int           `gorm:"type:int;" json:"threshold" `          //达几次
   Task           *db.ModelTask
}
func (m *GatherModel) Init(task *db.ModelTask) error {
@@ -34,6 +36,7 @@
      return err
   }
   m.Task = task
   m.OrgIds = orgIds
   m.AreaIds = areaIds
   m.Building = task.Building
@@ -53,6 +56,7 @@
   PicDate        string `json:"picDate"`
   DocumentNumber string
   CommunityId    string `json:"communityId"`
   OrgId          string `json:"orgId"`
   Building       string `json:"building"`
   Floor          string `json:"floor"`
   GatherPersons  int    `gorm:"type:int;" json:"gatherPersons"`  //聚集人数
@@ -74,11 +78,34 @@
      log.Fatalf("Failed to analyze and aggregate data: %v", err)
   }
   // Print or process the aggregation results as needed
   for location, persons := range aggregation {
      fmt.Printf("Gathering detected at %s with %d unique persons\n", location, len(persons))
   if len(aggregation) == 0 {
      return nil
   }
   return nil
   tagTypes := strings.Split(m.Task.PersonType, ",")
   results := make([]*db.ModelTaskResults, 0, len(aggregation))
   _, typeNames, err := service.GetPersonTypeNameByTypes(tagTypes)
   if err != nil {
      return err
   }
   event := strings.Join(typeNames, ",")
   for lt, persons := range aggregation {
      result := &db.ModelTaskResults{
         Title:       m.Task.Name,
         Event:       fmt.Sprintf("%s/%d人", event, len(persons)),
         ModelID:     m.Task.ModelID,
         ModelTaskID: m.Task.ID,
         CommunityId: lt.CommunityId,
         OrgID:       lt.OrgId,
         ObjectIds:   strings.Join(persons.Elements(), ","),
         Location:    lt.Location,
         Building:    lt.Building,
         Floor:       lt.Floor,
         PicDate:     lt.Time,
      }
      results = append(results, result)
   }
   return service.SaveTaskResults(results)
}
func (m *GatherModel) Shutdown() error {
@@ -151,46 +178,67 @@
         },
      },
      "aggs": map[string]interface{}{
         "gather_events": map[string]interface{}{
            "date_histogram": map[string]interface{}{
               "field":         "picDate",
               "interval":      fmt.Sprintf("%ds", gatherModel.AppearInterval),
               "min_doc_count": 1,
         "orgs": map[string]interface{}{ // 先聚合orgId
            "terms": map[string]interface{}{
               "field": "orgId", // 聚合orgId
               "size":  10000,
            },
            "aggs": map[string]interface{}{
               "community": map[string]interface{}{
               "community": map[string]interface{}{ // 在orgId聚合下聚合communityId
                  "terms": map[string]interface{}{
                     "field": "communityId", // 聚合小区id
                     "field": "communityId", // 聚合communityId
                     "size":  10000,
                  },
                  "aggs": map[string]interface{}{
                     "location": map[string]interface{}{
                     "location": map[string]interface{}{ // 在communityId下聚合building
                        "terms": map[string]interface{}{
                           "field": "cameraLocation.building", // 聚合楼栋
                           "size":  10000,
                        },
                        "aggs": map[string]interface{}{
                           "floor": map[string]interface{}{
                           "floor": map[string]interface{}{ // 在building下聚合floor
                              "terms": map[string]interface{}{
                                 "field": "cameraLocation.floor", // 聚合楼层
                                 "size":  10000,
                              },
                              "aggs": map[string]interface{}{
                                 "people": map[string]interface{}{
                                    "terms": map[string]interface{}{
                                       "field": "documentNumber", // 按人员唯一标识聚合
                                       "size":  10000,
                                 "gather_events": map[string]interface{}{ // 在floor下聚合gather_events
                                    "date_histogram": map[string]interface{}{
                                       "field":         "picDate",
                                       "interval":      fmt.Sprintf("%ds", gatherModel.AppearInterval),
                                       "min_doc_count": 1,
                                    },
                                 },
                                 "filter_gather": map[string]interface{}{
                                    "bucket_selector": map[string]interface{}{
                                       "buckets_path": map[string]interface{}{
                                          "personCount": "people._bucket_count", // 统计人数
                                    "aggs": map[string]interface{}{
                                       "people": map[string]interface{}{
                                          "terms": map[string]interface{}{
                                             "field": "documentNumber", // 按人员唯一标识聚合
                                             "size":  10000,
                                          },
                                       },
                                       "script": map[string]interface{}{
                                          "source": "params.personCount >= params.gatherPersons", // 聚集人数过滤
                                          "params": map[string]interface{}{
                                             "gatherPersons": gatherModel.GatherPersons,
                                       "filter_gather": map[string]interface{}{
                                          "bucket_selector": map[string]interface{}{
                                             "buckets_path": map[string]interface{}{
                                                "personCount": "people._bucket_count", // 统计人数
                                             },
                                             "script": map[string]interface{}{
                                                "source": "params.personCount >= params.gatherPersons", // 聚集人数过滤
                                                "params": map[string]interface{}{
                                                   "gatherPersons": gatherModel.GatherPersons,
                                                },
                                             },
                                          },
                                       },
                                       "frequency_filter": map[string]interface{}{ // 添加频率过滤
                                          "bucket_selector": map[string]interface{}{
                                             "buckets_path": map[string]interface{}{
                                                "eventCount": "_count", // 聚合事件次数
                                             },
                                             "script": map[string]interface{}{
                                                "source": "params.eventCount >= params.threshold", // 筛选频率达到阈值的事件
                                                "params": map[string]interface{}{
                                                   "threshold": gatherModel.Threshold,
                                                },
                                             },
                                          },
                                       },
                                    },
@@ -237,44 +285,50 @@
   // 解析聚合结果
   var records []GatherRecord
   if aggs, ok := result["aggregations"].(map[string]interface{}); ok {
      if gatherEvents, ok := aggs["gather_events"].(map[string]interface{}); ok {
         if buckets, ok := gatherEvents["buckets"].([]interface{}); ok {
            for _, bucket := range buckets {
               key := int64(bucket.(map[string]interface{})["key"].(float64)) / 1000 // 将毫秒转换为秒
               timestamp := time.Unix(key, 0).Format("2006-01-02T15:04:05")
      if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok {
         for _, orgBucket := range orgBuckets {
            orgId := orgBucket.(map[string]interface{})["key"].(string)
               // 解析按小区、楼栋和楼层的聚合结果
               if communityBuckets, ok := bucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok {
                  for _, communityBucket := range communityBuckets {
                     communityId := communityBucket.(map[string]interface{})["key"].(string)
            // 解析按communityId的聚合结果
            if communityBuckets, ok := orgBucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok {
               for _, communityBucket := range communityBuckets {
                  communityId := communityBucket.(map[string]interface{})["key"].(string)
                     // 解析按楼栋和楼层的聚合结果
                     if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok {
                        for _, locationBucket := range locationBuckets {
                           building := locationBucket.(map[string]interface{})["key"].(string)
                  // 解析按building的聚合结果
                  if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok {
                     for _, locationBucket := range locationBuckets {
                        building := locationBucket.(map[string]interface{})["key"].(string)
                           // 解析楼层
                           if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok {
                              for _, floorBucket := range floorBuckets {
                                 floor := floorBucket.(map[string]interface{})["key"].(string)
                        // 解析按floor的聚合结果
                        if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok {
                           for _, floorBucket := range floorBuckets {
                              floor := floorBucket.(map[string]interface{})["key"].(string)
                                 // 解析人员
                                 if peopleBuckets, ok := floorBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok {
                                    for _, person := range peopleBuckets {
                                       documentNumber := person.(map[string]interface{})["key"].(string)
                              // 解析聚合的事件
                              if gatherEvents, ok := floorBucket.(map[string]interface{})["gather_events"].(map[string]interface{})["buckets"].([]interface{}); ok {
                                 for _, eventBucket := range gatherEvents {
                                    key := int64(eventBucket.(map[string]interface{})["key"].(float64)) / 1000 // 将毫秒转换为秒
                                    timestamp := time.Unix(key, 0).Format("2006-01-02 15:04:05")
                                       // 构建 GatherRecord 结构体
                                       record := GatherRecord{
                                          PicDate:        timestamp,
                                          DocumentNumber: documentNumber,
                                          CommunityId:    communityId,
                                          Building:       building,
                                          Floor:          floor,
                                          AppearInterval: gatherModel.AppearInterval,
                                          GatherPersons:  gatherModel.GatherPersons,
                                    // 解析人员
                                    if peopleBuckets, ok := eventBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok {
                                       for _, person := range peopleBuckets {
                                          documentNumber := person.(map[string]interface{})["key"].(string)
                                          // 构建 GatherRecord 结构体
                                          record := GatherRecord{
                                             PicDate:        timestamp,
                                             DocumentNumber: documentNumber,
                                             CommunityId:    communityId,
                                             Building:       building,
                                             Floor:          floor,
                                             OrgId:          orgId,
                                             AppearInterval: gatherModel.AppearInterval,
                                             GatherPersons:  gatherModel.GatherPersons,
                                          }
                                          records = append(records, record)
                                       }
                                       records = append(records, record)
                                    }
                                 }
                              }
@@ -291,15 +345,44 @@
   return records, nil
}
func analyzeAndAggregate(records []GatherRecord) (map[string][]string, error) {
   // Implement logic to aggregate and analyze data based on GatherModel parameters
   // This is a placeholder for the actual implementation
   aggregation := make(map[string][]string)
type GatherLocationTime struct {
   CommunityId string
   OrgId       string
   Building    string
   Floor       string
   Location    string
   Time        string
}
   // Example logic:
func analyzeAndAggregate(records []GatherRecord) (map[GatherLocationTime]set.StringSet, error) {
   aggregation := make(map[GatherLocationTime]set.StringSet)
   domainIds := set.NewStringSet()
   for _, record := range records {
      key := fmt.Sprintf("%s%s%s", record.CommunityId, record.Building, record.Floor)
      aggregation[key] = append(aggregation[key], record.DocumentNumber)
      domainIds.Add(record.CommunityId)
   }
   domains, err := service.GetUnitsMapByIds(domainIds.Elements())
   if err != nil {
      return nil, err
   }
   for _, record := range records {
      if domains[record.CommunityId] == nil {
         continue
      }
      location := GatherLocationTime{
         CommunityId: record.CommunityId,
         OrgId:       record.OrgId,
         Building:    record.Building,
         Floor:       record.Floor,
         Location:    fmt.Sprintf("%s%s%s", domains[record.CommunityId].Name, record.Building, record.Floor),
         Time:        record.PicDate,
      }
      if aggregation[location] == nil {
         aggregation[location] = set.NewStringSet()
      }
      aggregation[location].Add(record.DocumentNumber)
   }
   return aggregation, nil