zhaoqingang
2025-02-08 3c033759200ad7c02dd59521b1aebbbdc35b98fa
models/locationAnalysis.go
@@ -1,7 +1,17 @@
package models
import (
   "bytes"
   "context"
   "encoding/json"
   "errors"
   "fmt"
   "github.com/elastic/go-elasticsearch/v6"
   //"log"
   "model-engine/config"
   "model-engine/pkg/set"
   "strconv"
   "strings"
   "time"
   "model-engine/db"
@@ -10,36 +20,46 @@
)
type LocationModel struct {
   AreaIds       map[string]struct{}
   Building      string       // 楼栋
   AlarmType     db.AlarmType // 预警方式
   KeyPersonType string       // 人员类型
   PersonLabel   string
   PersonCount   int // 人数, 出现的同类型人员数量
   Appearances   int // 出现次数,
   Duration      int // 时间范围, 单位天
   Task          *db.ModelTask
   AreaIds        []interface{} `json:"-"`
   OrgIds         []interface{} `json:"-"`
   Building       string        // 楼栋
   Floor          string
   AlarmType      db.AlarmType // 预警方式
   KeyPersonType  string       // 人员类型
   PersonLabel    string       // 人员标签
   PersonIdentity []string     // 人员身份
   Duration       int          // 时间范围
   Appearances    int          // 出现次数,
   StartTime      int          // 时间范围, 开始时间
   EndTime        int          // 时间范围, 结束时间
   Task           *db.ModelTask
}
func (m *LocationModel) Init(task *db.ModelTask) error {
   m.AreaIds = make(map[string]struct{})
   for _, a := range task.DomainUnitIds {
      m.AreaIds[a] = struct{}{}
   //m.AreaIds = make(map[string]struct{})
   //for _, a := range task.DomainUnitIds {
   //   m.AreaIds[a] = struct{}{}
   //}
   if len(task.DomainUnitIds) == 0 {
      return errors.New("empty domain set")
   }
   orgIds, areaIds, err := service.GetOrgIdsAndAreaIdsByDomainUnitIds(task.DomainUnitIds)
   if err != nil {
      return err
   }
   m.Task = task
   m.OrgIds = orgIds
   m.AreaIds = areaIds
   m.Building = task.Building
   m.AlarmType = task.AlarmType
   m.PersonIdentity = []string{"stranger", "visitor", "resident"} //task.IdentityType
   m.KeyPersonType = task.PersonType
   m.PersonLabel = task.PersonLabel
   if task.IdentityType != "" {
      m.PersonIdentity = strings.Split(task.IdentityType, ",")
   }
   for _, v := range task.Rules {
      if v.Alias == "personCount" {
         if val, ok := v.Value.(float64); ok {
            m.PersonCount = int(val)
         }
      }
      if v.Alias == "appearances" {
         if val, ok := v.Value.(float64); ok {
            m.Appearances = int(val)
@@ -49,6 +69,13 @@
      if v.Alias == "duration" {
         if val, ok := v.Value.(float64); ok {
            m.Duration = int(val)
         }
      }
      if v.Alias == "timeRange" {
         if val, ok := v.Value.(string); ok {
            ages := strings.Split(val, ",")
            m.StartTime, _ = strconv.Atoi(ages[0])
            m.EndTime, _ = strconv.Atoi(ages[1])
         }
      }
   }
@@ -63,11 +90,173 @@
   return nil
}
type LocationRecord struct {
   IDCard          string `json:"idCard"`
   PicDate         string `json:"picDate"`
   DocumentNumbers []string
   CommunityId     string `json:"communityId"`
   OrgId           string `json:"orgId"`
   Building        string `json:"building"`
   Floor           string `json:"floor"`
   AppearCount     int    `gorm:"type:int;" json:"appearCount"` // 出现次数
   //AppearInterval int    `gorm:"type:int;" json:"appearInterval"` // 出现间隔,单位为秒
}
type LocationPersonInfo struct {
   Id             string `json:"id"`
   DocumentNumber string `json:"document_number"`
   PersonType     string `json:"person_type"`
   //CommunityId        string `json:"community_id"`
   //OrgId              string `json:"org_id"`
   //PersonName         string `json:"person_name"`
   //IdCard             string `json:"id_card"`
   LastAppearanceTime int64 `json:"last_appearance_time"`
   //LastDirection      string `json:"last_direction"`
   //LastLocation       string `json:"last_location"`
}
//var (
//   processed        sync.Map                           // 存储已处理记录
//   cleanupThreshold = time.Now().Add(-100 * time.Hour) // 定义一个时间窗口,假设只保存最近100小时的记录
//)
func (m *LocationModel) Run() error {
   // 根据配置的时间段天数, 过滤规则配置的相同重点人员类型和相同标签人员频繁出现的楼层地址
   // 根据配置的时间段天数, 每天的时间范围内, 重点人员类型或者特定标签人员出现的楼层次数超过阈值
   results := make([]*db.ModelTaskResults, 0)
   var baseFilter, labelFilter, keyFilter, lastFilter []LocationPersonInfo
   var document_number_map map[string]LocationPersonInfo
   var document_number_list []string
   err := db.GetDB().Raw(`
      SELECT
         s.document_number,
--          s.community_id,
--          s.org_id,
--          p.person_name,
--          p.id_card,
         s.last_appearance_time,
--          s.last_direction,
--          s.last_location
      FROM
         snapshot_count_summary AS s
         JOIN person AS p ON p.id = s.document_number
      WHERE
         p.id_card != ""
         AND (p.community_id IN ?
         OR p.org_id IN ?)
         AND p.status IN ?
      `, m.AreaIds, m.OrgIds, m.PersonIdentity).Scan(&baseFilter).Error
   if err != nil {
      logger.Warnf(err.Error())
   }
   if len(baseFilter) == 0 {
      return fmt.Errorf("no results found that match the age condition %s - %s ", m.AreaIds, m.OrgIds)
   }
   logger.Debugf("task %s match age result %d", m.Task.Name, len(baseFilter))
   for _, i := range baseFilter {
      if _, ok := document_number_map[i.DocumentNumber]; !ok {
         document_number_list = append(document_number_list, i.DocumentNumber)
      }
      document_number_map[i.DocumentNumber] = i
   }
   if m.PersonLabel != "" {
      labels := strings.Split(m.PersonLabel, ",")
      err := db.GetDB().Raw(`
      SELECT
         p.id
      FROM
         person AS p
         JOIN person_label AS l ON p.id = l.person_id
      WHERE
         p.id IN ?
         AND   l.label_id IN ?
      `, document_number_list, labels).Scan(&labelFilter).Error
      if err != nil {
         logger.Warnf(err.Error())
      }
      if len(labelFilter) == 0 {
         return fmt.Errorf("no results found that match the label condition %s", m.PersonLabel)
      }
      logger.Debugf("task %s match label result %d", m.Task.Name, len(labelFilter))
   }
   document_number_list = []string{}
   for _, i := range labelFilter {
      document_number_list = append(document_number_list, i.Id)
   }
   if m.KeyPersonType != "" {
      keyTypes := strings.Split(m.KeyPersonType, ",")
      err := db.GetDB().Raw(`
      SELECT
         p.id,
         k.person_type
      FROM
         person AS p
         JOIN key_person AS k ON k.id_card = p.id_card
      WHERE
         p.id IN ?
         AND k.person_type IN ?
      `, m.StartTime, keyTypes).Scan(&keyFilter).Error
      if err != nil {
         logger.Warnf(err.Error())
      }
      if len(keyFilter) == 0 {
         return fmt.Errorf("no results found that match the key condition %s", m.KeyPersonType)
      }
      logger.Debugf("task %s match key person result %d", m.Task.Name, len(keyFilter))
   }
   logger.Debugf("task %s last result %d", m.Task.Name, len(lastFilter))
   document_number_list = []string{}
   for _, i := range keyFilter {
      document_number_list = append(document_number_list, i.Id)
      person := document_number_map[i.DocumentNumber]
      person.PersonType = i.PersonType
      document_number_map[i.DocumentNumber] = person
   }
   records, err := queryEsLocation(db.GetEsClient(), m, document_number_list)
   if err != nil {
      return err
   }
   domains, err := domainToLocation(records)
   if err != nil {
      return err
   }
   var tagTypes []string
   var lastAppearanceTime int64
   for _, record := range records {
      tagTypes = []string{}
      for _, personId := range record.DocumentNumbers {
         tagTypes = append(tagTypes, document_number_map[personId].PersonType)
         lastAppearanceTime = document_number_map[personId].LastAppearanceTime
      }
      _, typeNames, err := service.GetPersonTypeNameByTypes(tagTypes)
      if err != nil {
         return err
      }
      event := strings.Join(typeNames, ",")
      result := &db.ModelTaskResults{
         Title:         m.Task.Name,
         Event:         m.eventFormat(event, record.AppearCount),
         ModelID:       m.Task.ModelID,
         ModelTaskID:   m.Task.ID,
         CommunityId:   record.CommunityId,
         OrgID:         record.OrgId,
         ObjectIds:     strings.Join(record.DocumentNumbers, ","),
         Location:      fmt.Sprintf("%s%s%s", domains[record.CommunityId].Name, record.Building, record.Floor),
         PicDate:       time.Unix(lastAppearanceTime, 0).Format("2006-01-02 15:04:05"),
         FirstPersonID: record.DocumentNumbers[0],
      }
      results = append(results, result)
   }
   logger.Debugf("task %s last filter result %d", m.Task.Name, len(results))
   return service.SaveTaskResults(results)
}
@@ -83,6 +272,240 @@
   return nil
}
func (m *LocationModel) eventFormat() string {
   return ""
func (m *LocationModel) eventFormat(event string, AppearCount int) string {
   return fmt.Sprintf("%s人员进出%d次", event, AppearCount)
}
func queryEsLocation(esClient *elasticsearch.Client, locationModel *LocationModel, documentNumbers []string) ([]*LocationRecord, error) {
   var buf bytes.Buffer
   nowTime := time.Now()
   startTime := nowTime.Add(-time.Duration(locationModel.Duration) * 24 * time.Hour)
   // 构建过滤条件
   var filters []map[string]interface{}
   documentNumberFilter := map[string]interface{}{
      "terms": map[string]interface{}{
         "documentNumber": documentNumbers,
      },
   }
   filters = append(filters, documentNumberFilter)
   if len(locationModel.OrgIds) > 0 || len(locationModel.AreaIds) > 0 {
      // 获取数据权限过滤条件
      authFilters := GetDomainFilters(locationModel.OrgIds, locationModel.AreaIds)
      filters = append(filters, authFilters...)
   }
   // 地址过滤
   if locationModel.Building != "" || locationModel.Floor != "" {
      var addrParams map[string]interface{}
      if locationModel.Floor != "" {
         addrParams = map[string]interface{}{"bool": map[string]interface{}{
            "must": []interface{}{
               map[string]interface{}{
                  "term": map[string]interface{}{
                     "cameraLocation.building": locationModel.Building,
                  }},
               map[string]interface{}{
                  "term": map[string]interface{}{
                     "cameraLocation.floor": locationModel.Floor,
                  }},
            },
         }}
      } else if locationModel.Building != "" {
         addrParams = map[string]interface{}{
            "term": map[string]interface{}{
               "cameraLocation.building": locationModel.Building,
            }}
      }
      filters = append(filters, addrParams)
   }
   //// 重点人员过滤
   //if len(locationModel.KeyPersonType) > 0 {
   //   filters = append(filters, map[string]interface{}{
   //      "terms": map[string]interface{}{
   //         "keyPersonType": strings.Split(locationModel.KeyPersonType, ","),
   //      },
   //   })
   //}
   // 时间范围
   //filters = append(filters, map[string]interface{}{
   //   "range": map[string]interface{}{
   //      "picDate": map[string]interface{}{
   //         "gte": start.Format(time.DateTime),
   //         "lt":  now.Format(time.DateTime),
   //      },
   //   },
   //})
   for date := startTime; date.Before(nowTime); date = date.Add(24 * time.Hour) {
      start := time.Date(date.Year(), date.Month(), date.Day(), locationModel.StartTime, 0, 0, 0, date.Location())
      end := time.Date(date.Year(), date.Month(), date.Day(), locationModel.EndTime, 0, 0, 0, date.Location())
      filters = append(filters, map[string]interface{}{
         "range": map[string]interface{}{
            "picDate": map[string]interface{}{
               "gte": start.Format(time.RFC3339),
               "lte": end.Format(time.RFC3339),
            },
         },
      })
   }
   query := map[string]interface{}{
      "query": map[string]interface{}{
         "bool": map[string]interface{}{
            "filter": filters,
         },
      },
      "aggs": map[string]interface{}{
         "orgs": map[string]interface{}{ // 先聚合orgId
            "terms": map[string]interface{}{
               "field": "orgId", // 聚合orgId
               "size":  10000,
            },
            "aggs": map[string]interface{}{
               "community": map[string]interface{}{ // 在orgId聚合下聚合communityId
                  "terms": map[string]interface{}{
                     "field": "communityId", // 聚合communityId
                     "size":  10000,
                  },
                  "aggs": map[string]interface{}{
                     "location": map[string]interface{}{ // 在communityId下聚合building
                        "terms": map[string]interface{}{
                           "field": "cameraLocation.building", // 聚合楼栋
                           "size":  10000,
                        },
                        "aggs": map[string]interface{}{
                           "floor": map[string]interface{}{ // 在building下聚合floor
                              "terms": map[string]interface{}{
                                 "field": "cameraLocation.floor", // 聚合楼层
                                 "size":  10000,
                              },
                              "aggs": map[string]interface{}{
                                 "filter_floor": map[string]interface{}{
                                    "bucket_selector": map[string]interface{}{
                                       "buckets_path": map[string]interface{}{
                                          "eventCount": "_count",
                                       },
                                       "script": map[string]interface{}{
                                          "source": "params.eventCount >= params.threshold",
                                          "params": map[string]interface{}{
                                             "threshold": locationModel.Appearances,
                                          },
                                       },
                                    },
                                 },
                                 "document_numbers": map[string]interface{}{ // 新增按 documentNumber 聚合
                                    "terms": map[string]interface{}{
                                       "field": "documentNumber",
                                       "size":  10000,
                                    },
                                 },
                              },
                           },
                        },
                     },
                  },
               },
            },
         },
      },
      "size": 0,
   }
   if err := json.NewEncoder(&buf).Encode(query); err != nil {
      return nil, fmt.Errorf("error encoding query: %s", err)
   }
   res, err := esClient.Search(
      esClient.Search.WithContext(context.Background()),
      esClient.Search.WithIndex(config.EsInfo.EsIndex.AiOcean.IndexName),
      esClient.Search.WithDocumentType(config.EsInfo.EsIndex.AiOcean.IndexType),
      esClient.Search.WithBody(&buf),
      esClient.Search.WithTrackTotalHits(true),
      esClient.Search.WithPretty(),
   )
   if err != nil {
      return nil, fmt.Errorf("error getting response: %s", err)
   }
   defer res.Body.Close()
   // Check for a successful status code (2xx range)
   if res.IsError() {
      return nil, fmt.Errorf("error getting response: %s", res.String())
   }
   var result map[string]interface{}
   if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
      return nil, fmt.Errorf("error parsing response body: %s", err)
   }
   // 解析聚合结果
   var records []*LocationRecord
   if aggs, ok := result["aggregations"].(map[string]interface{}); ok {
      if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok {
         for _, orgBucket := range orgBuckets {
            orgId := orgBucket.(map[string]interface{})["key"].(string)
            // 解析按communityId的聚合结果
            if communityBuckets, ok := orgBucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok {
               for _, communityBucket := range communityBuckets {
                  communityId := communityBucket.(map[string]interface{})["key"].(string)
                  // 解析按building的聚合结果
                  if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok {
                     for _, locationBucket := range locationBuckets {
                        building := locationBucket.(map[string]interface{})["key"].(string)
                        // 解析按floor的聚合结果
                        if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok {
                           for _, floorBucket := range floorBuckets {
                              floor := floorBucket.(map[string]interface{})["key"].(string)
                              appearCount := floorBucket.(map[string]interface{})["filter_floor"].(int)
                              // 构建 LocationRecord 结构体
                              var persons []string
                              if docNumBuckets, ok := floorBucket.(map[string]interface{})["document_numbers"].(map[string]interface{})["buckets"].([]interface{}); ok {
                                 for _, docNumBucket := range docNumBuckets {
                                    persons = append(persons, docNumBucket.(map[string]interface{})["key"].(string))
                                 }
                              }
                              record := &LocationRecord{
                                 //PicDate:        timestamp,
                                 DocumentNumbers: persons,
                                 CommunityId:     communityId,
                                 Building:        building,
                                 Floor:           floor,
                                 OrgId:           orgId,
                                 AppearCount:     appearCount,
                              }
                              records = append(records, record)
                           }
                        }
                     }
                  }
               }
            }
         }
      }
   }
   return records, nil
}
func domainToLocation(records []*LocationRecord) (map[string]*db.DomainUnit, error) {
   if len(records) == 0 {
      return nil, nil
   }
   domainIds := set.NewStringSet()
   for _, record := range records {
      domainIds.Add(record.CommunityId)
   }
   domains, err := service.GetUnitsMapByIds(domainIds.Elements())
   if err != nil {
      return nil, err
   }
   return domains, nil
}