package models
|
|
import (
|
"bytes"
|
"context"
|
"encoding/json"
|
"fmt"
|
"github.com/elastic/go-elasticsearch/v6"
|
"log"
|
"model-engine/config"
|
"model-engine/db"
|
"model-engine/pkg/set"
|
"model-engine/service"
|
"strings"
|
"time"
|
)
|
|
type GatherModel struct {
|
OrgIds []interface{} `json:"-"`
|
AreaIds []interface{} `json:"-"`
|
Building string `gorm:"type:varchar(255)" json:"building"` //楼栋
|
Floor string `gorm:"type:varchar(255)" json:"floor"` //楼层
|
AlarmType db.AlarmType `gorm:"type:varchar(255);" json:"alarmType"` //预警方式
|
PersonType string `gorm:"type:varchar(255);" json:"personType"` //人员类型
|
GatherPersons int `gorm:"type:int;" json:"gatherPersons"` //聚集人数
|
AppearInterval int `gorm:"type:int;" json:"appearInterval"` //出现间隔,单位为秒
|
DaysWindow int `gorm:"type:int;" json:"daysWindow" ` //近几天内
|
Threshold int `gorm:"type:int;" json:"threshold" ` //达几次
|
Task *db.ModelTask
|
}
|
|
func (m *GatherModel) Init(task *db.ModelTask) error {
|
|
orgIds, areaIds, err := service.GetOrgIdsAndAreaIdsByDomainUnitIds(task.DomainUnitIds)
|
if err != nil {
|
return err
|
}
|
|
m.Task = task
|
m.OrgIds = orgIds
|
m.AreaIds = areaIds
|
m.Building = task.Building
|
m.Floor = task.Floor
|
m.AlarmType = task.AlarmType
|
m.PersonType = task.PersonType
|
m.GatherPersons = task.GatherPersons
|
m.AppearInterval = task.AppearInterval
|
m.DaysWindow = task.DaysWindow
|
m.Threshold = task.Threshold
|
fmt.Println("GatherModel init finish ...")
|
return nil
|
}
|
|
type GatherRecord struct {
|
IDCard string `json:"idCard"`
|
PicDate string `json:"picDate"`
|
DocumentNumber string
|
CommunityId string `json:"communityId"`
|
OrgId string `json:"orgId"`
|
Building string `json:"building"`
|
Floor string `json:"floor"`
|
GatherPersons int `gorm:"type:int;" json:"gatherPersons"` //聚集人数
|
AppearInterval int `gorm:"type:int;" json:"appearInterval"` //出现间隔,单位为秒
|
}
|
|
func (m *GatherModel) Run() error {
|
records, err := queryElasticsearch(db.GetEsClient(), m)
|
if err != nil {
|
log.Fatalf("Failed to query Elasticsearch: %v", err)
|
}
|
|
if len(records) == 0 {
|
return nil
|
}
|
|
aggregation, err := analyzeAndAggregate(records)
|
if err != nil {
|
log.Fatalf("Failed to analyze and aggregate data: %v", err)
|
}
|
|
if len(aggregation) == 0 {
|
return nil
|
}
|
|
tagTypes := strings.Split(m.Task.PersonType, ",")
|
results := make([]*db.ModelTaskResults, 0, len(aggregation))
|
_, typeNames, err := service.GetPersonTypeNameByTypes(tagTypes)
|
if err != nil {
|
return err
|
}
|
event := strings.Join(typeNames, ",")
|
for lt, persons := range aggregation {
|
result := &db.ModelTaskResults{
|
Title: m.Task.Name,
|
Event: fmt.Sprintf("%s/%d人", event, len(persons)),
|
ModelID: m.Task.ModelID,
|
ModelTaskID: m.Task.ID,
|
CommunityId: lt.CommunityId,
|
OrgID: lt.OrgId,
|
ObjectIds: strings.Join(persons.Elements(), ","),
|
Location: lt.Location,
|
Building: lt.Building,
|
Floor: lt.Floor,
|
PicDate: lt.Time,
|
}
|
results = append(results, result)
|
}
|
return service.SaveTaskResults(results)
|
}
|
|
func (m *GatherModel) Shutdown() error {
|
// 清理资源
|
fmt.Println("Shutting down GatherModel Model")
|
return nil
|
}
|
|
func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]GatherRecord, error) {
|
var buf bytes.Buffer
|
now := time.Now()
|
start := now.Add(-time.Duration(gatherModel.DaysWindow) * 24 * time.Hour)
|
|
// 构建过滤条件
|
var filters []map[string]interface{}
|
if len(gatherModel.OrgIds) > 0 || len(gatherModel.AreaIds) > 0 {
|
// 获取数据权限过滤条件
|
authFilters := GetDomainFilters(gatherModel.OrgIds, gatherModel.AreaIds)
|
filters = append(filters, authFilters...)
|
}
|
|
// 地址过滤
|
if gatherModel.Building != "" || gatherModel.Floor != "" {
|
var addrParams map[string]interface{}
|
if gatherModel.Floor != "" {
|
addrParams = map[string]interface{}{"bool": map[string]interface{}{
|
"must": []interface{}{
|
map[string]interface{}{
|
"term": map[string]interface{}{
|
"cameraLocation.building": gatherModel.Building,
|
}},
|
map[string]interface{}{
|
"term": map[string]interface{}{
|
"cameraLocation.floor": gatherModel.Floor,
|
}},
|
},
|
}}
|
} else if gatherModel.Building != "" {
|
addrParams = map[string]interface{}{
|
"term": map[string]interface{}{
|
"cameraLocation.building": gatherModel.Building,
|
}}
|
}
|
filters = append(filters, addrParams)
|
}
|
|
// 重点人员过滤
|
if len(gatherModel.PersonType) > 0 {
|
filters = append(filters, map[string]interface{}{
|
"terms": map[string]interface{}{
|
"keyPersonType": strings.Split(gatherModel.PersonType, ","),
|
},
|
})
|
}
|
|
// 时间范围
|
filters = append(filters, map[string]interface{}{
|
"range": map[string]interface{}{
|
"picDate": map[string]interface{}{
|
"gte": start.Format(time.DateTime),
|
"lt": now.Format(time.DateTime),
|
},
|
},
|
})
|
|
query := map[string]interface{}{
|
"query": map[string]interface{}{
|
"bool": map[string]interface{}{
|
"filter": filters,
|
},
|
},
|
"aggs": map[string]interface{}{
|
"orgs": map[string]interface{}{ // 先聚合orgId
|
"terms": map[string]interface{}{
|
"field": "orgId", // 聚合orgId
|
"size": 10000,
|
},
|
"aggs": map[string]interface{}{
|
"community": map[string]interface{}{ // 在orgId聚合下聚合communityId
|
"terms": map[string]interface{}{
|
"field": "communityId", // 聚合communityId
|
"size": 10000,
|
},
|
"aggs": map[string]interface{}{
|
"location": map[string]interface{}{ // 在communityId下聚合building
|
"terms": map[string]interface{}{
|
"field": "cameraLocation.building", // 聚合楼栋
|
"size": 10000,
|
},
|
"aggs": map[string]interface{}{
|
"floor": map[string]interface{}{ // 在building下聚合floor
|
"terms": map[string]interface{}{
|
"field": "cameraLocation.floor", // 聚合楼层
|
"size": 10000,
|
},
|
"aggs": map[string]interface{}{
|
"gather_events": map[string]interface{}{ // 在floor下聚合gather_events
|
"date_histogram": map[string]interface{}{
|
"field": "picDate",
|
"interval": fmt.Sprintf("%ds", gatherModel.AppearInterval),
|
"min_doc_count": 1,
|
},
|
"aggs": map[string]interface{}{
|
"people": map[string]interface{}{
|
"terms": map[string]interface{}{
|
"field": "documentNumber", // 按人员唯一标识聚合
|
"size": 10000,
|
},
|
},
|
"filter_gather": map[string]interface{}{
|
"bucket_selector": map[string]interface{}{
|
"buckets_path": map[string]interface{}{
|
"personCount": "people._bucket_count", // 统计人数
|
},
|
"script": map[string]interface{}{
|
"source": "params.personCount >= params.gatherPersons", // 聚集人数过滤
|
"params": map[string]interface{}{
|
"gatherPersons": gatherModel.GatherPersons,
|
},
|
},
|
},
|
},
|
"frequency_filter": map[string]interface{}{ // 添加频率过滤
|
"bucket_selector": map[string]interface{}{
|
"buckets_path": map[string]interface{}{
|
"eventCount": "_count", // 聚合事件次数
|
},
|
"script": map[string]interface{}{
|
"source": "params.eventCount >= params.threshold", // 筛选频率达到阈值的事件
|
"params": map[string]interface{}{
|
"threshold": gatherModel.Threshold,
|
},
|
},
|
},
|
},
|
},
|
},
|
},
|
},
|
},
|
},
|
},
|
},
|
},
|
},
|
},
|
"size": 0,
|
}
|
|
if err := json.NewEncoder(&buf).Encode(query); err != nil {
|
return nil, fmt.Errorf("error encoding query: %s", err)
|
}
|
|
res, err := esClient.Search(
|
esClient.Search.WithContext(context.Background()),
|
esClient.Search.WithIndex(config.EsInfo.EsIndex.AiOcean.IndexName),
|
esClient.Search.WithDocumentType(config.EsInfo.EsIndex.AiOcean.IndexType),
|
esClient.Search.WithBody(&buf),
|
esClient.Search.WithTrackTotalHits(true),
|
esClient.Search.WithPretty(),
|
)
|
if err != nil {
|
return nil, fmt.Errorf("error getting response: %s", err)
|
}
|
defer res.Body.Close()
|
|
// Check for a successful status code (2xx range)
|
if res.IsError() {
|
return nil, fmt.Errorf("error getting response: %s", res.String())
|
}
|
|
var result map[string]interface{}
|
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
|
return nil, fmt.Errorf("error parsing response body: %s", err)
|
}
|
|
// 解析聚合结果
|
var records []GatherRecord
|
if aggs, ok := result["aggregations"].(map[string]interface{}); ok {
|
if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok {
|
for _, orgBucket := range orgBuckets {
|
orgId := orgBucket.(map[string]interface{})["key"].(string)
|
|
// 解析按communityId的聚合结果
|
if communityBuckets, ok := orgBucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok {
|
for _, communityBucket := range communityBuckets {
|
communityId := communityBucket.(map[string]interface{})["key"].(string)
|
|
// 解析按building的聚合结果
|
if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok {
|
for _, locationBucket := range locationBuckets {
|
building := locationBucket.(map[string]interface{})["key"].(string)
|
|
// 解析按floor的聚合结果
|
if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok {
|
for _, floorBucket := range floorBuckets {
|
floor := floorBucket.(map[string]interface{})["key"].(string)
|
|
// 解析聚合的事件
|
if gatherEvents, ok := floorBucket.(map[string]interface{})["gather_events"].(map[string]interface{})["buckets"].([]interface{}); ok {
|
for _, eventBucket := range gatherEvents {
|
key := int64(eventBucket.(map[string]interface{})["key"].(float64)) / 1000 // 将毫秒转换为秒
|
timestamp := time.Unix(key, 0).UTC().Format("2006-01-02 15:04:05")
|
|
// 解析人员
|
if peopleBuckets, ok := eventBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok {
|
for _, person := range peopleBuckets {
|
documentNumber := person.(map[string]interface{})["key"].(string)
|
|
// 构建 GatherRecord 结构体
|
record := GatherRecord{
|
PicDate: timestamp,
|
DocumentNumber: documentNumber,
|
CommunityId: communityId,
|
Building: building,
|
Floor: floor,
|
OrgId: orgId,
|
AppearInterval: gatherModel.AppearInterval,
|
GatherPersons: gatherModel.GatherPersons,
|
}
|
|
records = append(records, record)
|
}
|
}
|
}
|
}
|
}
|
}
|
}
|
}
|
}
|
}
|
}
|
}
|
}
|
|
return records, nil
|
}
|
|
type GatherLocationTime struct {
|
CommunityId string
|
OrgId string
|
Building string
|
Floor string
|
Location string
|
Time string
|
}
|
|
func analyzeAndAggregate(records []GatherRecord) (map[GatherLocationTime]set.StringSet, error) {
|
aggregation := make(map[GatherLocationTime]set.StringSet)
|
domainIds := set.NewStringSet()
|
for _, record := range records {
|
domainIds.Add(record.CommunityId)
|
}
|
|
domains, err := service.GetUnitsMapByIds(domainIds.Elements())
|
if err != nil {
|
return nil, err
|
}
|
|
for _, record := range records {
|
if domains[record.CommunityId] == nil {
|
continue
|
}
|
|
location := GatherLocationTime{
|
CommunityId: record.CommunityId,
|
OrgId: record.OrgId,
|
Building: record.Building,
|
Floor: record.Floor,
|
Location: fmt.Sprintf("%s%s%s", domains[record.CommunityId].Name, record.Building, record.Floor),
|
Time: record.PicDate,
|
}
|
if aggregation[location] == nil {
|
aggregation[location] = set.NewStringSet()
|
}
|
aggregation[location].Add(record.DocumentNumber)
|
}
|
|
return aggregation, nil
|
}
|