package models
|
|
import (
|
"bytes"
|
"context"
|
"encoding/json"
|
"errors"
|
"fmt"
|
"github.com/elastic/go-elasticsearch/v6"
|
//"log"
|
"model-engine/config"
|
"model-engine/pkg/set"
|
"strconv"
|
"strings"
|
"time"
|
|
"model-engine/db"
|
"model-engine/pkg/logger"
|
"model-engine/service"
|
)
|
|
type LocationModel struct {
|
AreaIds []interface{} `json:"-"`
|
OrgIds []interface{} `json:"-"`
|
Building string // 楼栋
|
Floor string
|
AlarmType db.AlarmType // 预警方式
|
KeyPersonType string // 人员类型
|
PersonLabel string // 人员标签
|
PersonIdentity []string // 人员身份
|
Duration int // 时间范围
|
Appearances int // 出现次数,
|
StartTime int // 时间范围, 开始时间
|
EndTime int // 时间范围, 结束时间
|
Task *db.ModelTask
|
}
|
|
func (m *LocationModel) Init(task *db.ModelTask) error {
|
//m.AreaIds = make(map[string]struct{})
|
//for _, a := range task.DomainUnitIds {
|
// m.AreaIds[a] = struct{}{}
|
//}
|
if len(task.DomainUnitIds) == 0 {
|
return errors.New("empty domain set")
|
}
|
orgIds, areaIds, err := service.GetOrgIdsAndAreaIdsByDomainUnitIds(task.DomainUnitIds)
|
if err != nil {
|
return err
|
}
|
|
m.Task = task
|
m.OrgIds = orgIds
|
m.AreaIds = areaIds
|
m.Building = task.Building
|
m.AlarmType = task.AlarmType
|
m.PersonIdentity = []string{"stranger", "visitor", "resident"} //task.IdentityType
|
m.KeyPersonType = task.PersonType
|
m.PersonLabel = task.PersonLabel
|
if task.IdentityType != "" {
|
m.PersonIdentity = strings.Split(task.IdentityType, ",")
|
}
|
for _, v := range task.Rules {
|
if v.Alias == "appearances" {
|
if val, ok := v.Value.(float64); ok {
|
m.Appearances = int(val)
|
}
|
}
|
|
if v.Alias == "duration" {
|
if val, ok := v.Value.(float64); ok {
|
m.Duration = int(val)
|
}
|
}
|
if v.Alias == "timeRange" {
|
if val, ok := v.Value.(string); ok {
|
ages := strings.Split(val, ",")
|
m.StartTime, _ = strconv.Atoi(ages[0])
|
m.EndTime, _ = strconv.Atoi(ages[1])
|
}
|
}
|
}
|
|
// 默认计算30天的数据
|
if m.Duration == 0 {
|
m.Duration = 30
|
}
|
|
logger.Debugf("LocationModel init finish ...task id:%s, name:%s, rule:%+v", task.ID, task.Name, m)
|
|
return nil
|
}
|
|
type LocationRecord struct {
|
IDCard string `json:"idCard"`
|
PicDate string `json:"picDate"`
|
DocumentNumbers []string
|
CommunityId string `json:"communityId"`
|
OrgId string `json:"orgId"`
|
Building string `json:"building"`
|
Floor string `json:"floor"`
|
AppearCount int `gorm:"type:int;" json:"appearCount"` // 出现次数
|
//AppearInterval int `gorm:"type:int;" json:"appearInterval"` // 出现间隔,单位为秒
|
}
|
|
type LocationPersonInfo struct {
|
Id string `json:"id"`
|
DocumentNumber string `json:"document_number"`
|
PersonType string `json:"person_type"`
|
//CommunityId string `json:"community_id"`
|
//OrgId string `json:"org_id"`
|
//PersonName string `json:"person_name"`
|
//IdCard string `json:"id_card"`
|
LastAppearanceTime int64 `json:"last_appearance_time"`
|
//LastDirection string `json:"last_direction"`
|
//LastLocation string `json:"last_location"`
|
}
|
|
//var (
|
// processed sync.Map // 存储已处理记录
|
// cleanupThreshold = time.Now().Add(-100 * time.Hour) // 定义一个时间窗口,假设只保存最近100小时的记录
|
//)
|
|
func (m *LocationModel) Run() error {
|
// 根据配置的时间段天数, 每天的时间范围内, 重点人员类型或者特定标签人员出现的楼层次数超过阈值
|
|
results := make([]*db.ModelTaskResults, 0)
|
var baseFilter, labelFilter, keyFilter, lastFilter []LocationPersonInfo
|
var document_number_map map[string]LocationPersonInfo
|
var document_number_list []string
|
err := db.GetDB().Raw(`
|
SELECT
|
s.document_number,
|
-- s.community_id,
|
-- s.org_id,
|
-- p.person_name,
|
-- p.id_card,
|
s.last_appearance_time,
|
-- s.last_direction,
|
-- s.last_location
|
FROM
|
snapshot_count_summary AS s
|
JOIN person AS p ON p.id = s.document_number
|
WHERE
|
p.id_card != ""
|
AND (p.community_id IN ?
|
OR p.org_id IN ?)
|
AND p.status IN ?
|
`, m.AreaIds, m.OrgIds, m.PersonIdentity).Scan(&baseFilter).Error
|
if err != nil {
|
logger.Warnf(err.Error())
|
}
|
|
if len(baseFilter) == 0 {
|
return fmt.Errorf("no results found that match the age condition %s - %s ", m.AreaIds, m.OrgIds)
|
}
|
|
logger.Debugf("task %s match age result %d", m.Task.Name, len(baseFilter))
|
for _, i := range baseFilter {
|
if _, ok := document_number_map[i.DocumentNumber]; !ok {
|
document_number_list = append(document_number_list, i.DocumentNumber)
|
}
|
document_number_map[i.DocumentNumber] = i
|
}
|
if m.PersonLabel != "" {
|
labels := strings.Split(m.PersonLabel, ",")
|
err := db.GetDB().Raw(`
|
SELECT
|
p.id
|
FROM
|
person AS p
|
JOIN person_label AS l ON p.id = l.person_id
|
WHERE
|
p.id IN ?
|
AND l.label_id IN ?
|
`, document_number_list, labels).Scan(&labelFilter).Error
|
if err != nil {
|
logger.Warnf(err.Error())
|
}
|
|
if len(labelFilter) == 0 {
|
return fmt.Errorf("no results found that match the label condition %s", m.PersonLabel)
|
}
|
|
logger.Debugf("task %s match label result %d", m.Task.Name, len(labelFilter))
|
}
|
|
document_number_list = []string{}
|
for _, i := range labelFilter {
|
|
document_number_list = append(document_number_list, i.Id)
|
|
}
|
|
if m.KeyPersonType != "" {
|
keyTypes := strings.Split(m.KeyPersonType, ",")
|
err := db.GetDB().Raw(`
|
SELECT
|
p.id,
|
k.person_type
|
FROM
|
person AS p
|
JOIN key_person AS k ON k.id_card = p.id_card
|
WHERE
|
p.id IN ?
|
AND k.person_type IN ?
|
`, m.StartTime, keyTypes).Scan(&keyFilter).Error
|
if err != nil {
|
logger.Warnf(err.Error())
|
}
|
if len(keyFilter) == 0 {
|
return fmt.Errorf("no results found that match the key condition %s", m.KeyPersonType)
|
}
|
|
logger.Debugf("task %s match key person result %d", m.Task.Name, len(keyFilter))
|
}
|
|
logger.Debugf("task %s last result %d", m.Task.Name, len(lastFilter))
|
document_number_list = []string{}
|
for _, i := range keyFilter {
|
document_number_list = append(document_number_list, i.Id)
|
person := document_number_map[i.DocumentNumber]
|
person.PersonType = i.PersonType
|
document_number_map[i.DocumentNumber] = person
|
}
|
records, err := queryEsLocation(db.GetEsClient(), m, document_number_list)
|
if err != nil {
|
return err
|
}
|
domains, err := domainToLocation(records)
|
if err != nil {
|
return err
|
}
|
var tagTypes []string
|
var lastAppearanceTime int64
|
for _, record := range records {
|
tagTypes = []string{}
|
for _, personId := range record.DocumentNumbers {
|
tagTypes = append(tagTypes, document_number_map[personId].PersonType)
|
lastAppearanceTime = document_number_map[personId].LastAppearanceTime
|
}
|
_, typeNames, err := service.GetPersonTypeNameByTypes(tagTypes)
|
if err != nil {
|
return err
|
}
|
event := strings.Join(typeNames, ",")
|
result := &db.ModelTaskResults{
|
Title: m.Task.Name,
|
Event: m.eventFormat(event, record.AppearCount),
|
ModelID: m.Task.ModelID,
|
ModelTaskID: m.Task.ID,
|
CommunityId: record.CommunityId,
|
OrgID: record.OrgId,
|
ObjectIds: strings.Join(record.DocumentNumbers, ","),
|
Location: fmt.Sprintf("%s%s%s", domains[record.CommunityId].Name, record.Building, record.Floor),
|
PicDate: time.Unix(lastAppearanceTime, 0).Format("2006-01-02 15:04:05"),
|
FirstPersonID: record.DocumentNumbers[0],
|
}
|
results = append(results, result)
|
}
|
logger.Debugf("task %s last filter result %d", m.Task.Name, len(results))
|
return service.SaveTaskResults(results)
|
}
|
|
func (m *LocationModel) KeepAlive() error {
|
db.GetDB().Model(m.Task).Where("id = ?", m.Task.ID).Update("last_run_time", time.Now())
|
return nil
|
}
|
|
func (m *LocationModel) Shutdown() error {
|
// 清理资源
|
fmt.Println("Shutting down LocationModel Model")
|
return nil
|
}
|
|
func (m *LocationModel) eventFormat(event string, AppearCount int) string {
|
return fmt.Sprintf("%s人员进出%d次", event, AppearCount)
|
}
|
|
func queryEsLocation(esClient *elasticsearch.Client, locationModel *LocationModel, documentNumbers []string) ([]*LocationRecord, error) {
|
var buf bytes.Buffer
|
nowTime := time.Now()
|
startTime := nowTime.Add(-time.Duration(locationModel.Duration) * 24 * time.Hour)
|
|
// 构建过滤条件
|
var filters []map[string]interface{}
|
documentNumberFilter := map[string]interface{}{
|
"terms": map[string]interface{}{
|
"documentNumber": documentNumbers,
|
},
|
}
|
filters = append(filters, documentNumberFilter)
|
|
if len(locationModel.OrgIds) > 0 || len(locationModel.AreaIds) > 0 {
|
// 获取数据权限过滤条件
|
authFilters := GetDomainFilters(locationModel.OrgIds, locationModel.AreaIds)
|
filters = append(filters, authFilters...)
|
}
|
|
// 地址过滤
|
if locationModel.Building != "" || locationModel.Floor != "" {
|
var addrParams map[string]interface{}
|
if locationModel.Floor != "" {
|
addrParams = map[string]interface{}{"bool": map[string]interface{}{
|
"must": []interface{}{
|
map[string]interface{}{
|
"term": map[string]interface{}{
|
"cameraLocation.building": locationModel.Building,
|
}},
|
map[string]interface{}{
|
"term": map[string]interface{}{
|
"cameraLocation.floor": locationModel.Floor,
|
}},
|
},
|
}}
|
} else if locationModel.Building != "" {
|
addrParams = map[string]interface{}{
|
"term": map[string]interface{}{
|
"cameraLocation.building": locationModel.Building,
|
}}
|
}
|
filters = append(filters, addrParams)
|
}
|
|
//// 重点人员过滤
|
//if len(locationModel.KeyPersonType) > 0 {
|
// filters = append(filters, map[string]interface{}{
|
// "terms": map[string]interface{}{
|
// "keyPersonType": strings.Split(locationModel.KeyPersonType, ","),
|
// },
|
// })
|
//}
|
|
// 时间范围
|
//filters = append(filters, map[string]interface{}{
|
// "range": map[string]interface{}{
|
// "picDate": map[string]interface{}{
|
// "gte": start.Format(time.DateTime),
|
// "lt": now.Format(time.DateTime),
|
// },
|
// },
|
//})
|
for date := startTime; date.Before(nowTime); date = date.Add(24 * time.Hour) {
|
start := time.Date(date.Year(), date.Month(), date.Day(), locationModel.StartTime, 0, 0, 0, date.Location())
|
end := time.Date(date.Year(), date.Month(), date.Day(), locationModel.EndTime, 0, 0, 0, date.Location())
|
|
filters = append(filters, map[string]interface{}{
|
"range": map[string]interface{}{
|
"picDate": map[string]interface{}{
|
"gte": start.Format(time.RFC3339),
|
"lte": end.Format(time.RFC3339),
|
},
|
},
|
})
|
}
|
|
query := map[string]interface{}{
|
"query": map[string]interface{}{
|
"bool": map[string]interface{}{
|
"filter": filters,
|
},
|
},
|
"aggs": map[string]interface{}{
|
"orgs": map[string]interface{}{ // 先聚合orgId
|
"terms": map[string]interface{}{
|
"field": "orgId", // 聚合orgId
|
"size": 10000,
|
},
|
"aggs": map[string]interface{}{
|
"community": map[string]interface{}{ // 在orgId聚合下聚合communityId
|
"terms": map[string]interface{}{
|
"field": "communityId", // 聚合communityId
|
"size": 10000,
|
},
|
"aggs": map[string]interface{}{
|
"location": map[string]interface{}{ // 在communityId下聚合building
|
"terms": map[string]interface{}{
|
"field": "cameraLocation.building", // 聚合楼栋
|
"size": 10000,
|
},
|
"aggs": map[string]interface{}{
|
"floor": map[string]interface{}{ // 在building下聚合floor
|
"terms": map[string]interface{}{
|
"field": "cameraLocation.floor", // 聚合楼层
|
"size": 10000,
|
},
|
"aggs": map[string]interface{}{
|
"filter_floor": map[string]interface{}{
|
"bucket_selector": map[string]interface{}{
|
"buckets_path": map[string]interface{}{
|
"eventCount": "_count",
|
},
|
"script": map[string]interface{}{
|
"source": "params.eventCount >= params.threshold",
|
"params": map[string]interface{}{
|
"threshold": locationModel.Appearances,
|
},
|
},
|
},
|
},
|
"document_numbers": map[string]interface{}{ // 新增按 documentNumber 聚合
|
"terms": map[string]interface{}{
|
"field": "documentNumber",
|
"size": 10000,
|
},
|
},
|
},
|
},
|
},
|
},
|
},
|
},
|
},
|
},
|
},
|
"size": 0,
|
}
|
|
if err := json.NewEncoder(&buf).Encode(query); err != nil {
|
return nil, fmt.Errorf("error encoding query: %s", err)
|
}
|
|
res, err := esClient.Search(
|
esClient.Search.WithContext(context.Background()),
|
esClient.Search.WithIndex(config.EsInfo.EsIndex.AiOcean.IndexName),
|
esClient.Search.WithDocumentType(config.EsInfo.EsIndex.AiOcean.IndexType),
|
esClient.Search.WithBody(&buf),
|
esClient.Search.WithTrackTotalHits(true),
|
esClient.Search.WithPretty(),
|
)
|
if err != nil {
|
return nil, fmt.Errorf("error getting response: %s", err)
|
}
|
defer res.Body.Close()
|
|
// Check for a successful status code (2xx range)
|
if res.IsError() {
|
return nil, fmt.Errorf("error getting response: %s", res.String())
|
}
|
|
var result map[string]interface{}
|
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
|
return nil, fmt.Errorf("error parsing response body: %s", err)
|
}
|
|
// 解析聚合结果
|
var records []*LocationRecord
|
if aggs, ok := result["aggregations"].(map[string]interface{}); ok {
|
if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok {
|
for _, orgBucket := range orgBuckets {
|
orgId := orgBucket.(map[string]interface{})["key"].(string)
|
|
// 解析按communityId的聚合结果
|
if communityBuckets, ok := orgBucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok {
|
for _, communityBucket := range communityBuckets {
|
communityId := communityBucket.(map[string]interface{})["key"].(string)
|
|
// 解析按building的聚合结果
|
if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok {
|
for _, locationBucket := range locationBuckets {
|
building := locationBucket.(map[string]interface{})["key"].(string)
|
|
// 解析按floor的聚合结果
|
if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok {
|
for _, floorBucket := range floorBuckets {
|
floor := floorBucket.(map[string]interface{})["key"].(string)
|
appearCount := floorBucket.(map[string]interface{})["filter_floor"].(int)
|
// 构建 LocationRecord 结构体
|
var persons []string
|
if docNumBuckets, ok := floorBucket.(map[string]interface{})["document_numbers"].(map[string]interface{})["buckets"].([]interface{}); ok {
|
for _, docNumBucket := range docNumBuckets {
|
persons = append(persons, docNumBucket.(map[string]interface{})["key"].(string))
|
}
|
}
|
record := &LocationRecord{
|
//PicDate: timestamp,
|
DocumentNumbers: persons,
|
CommunityId: communityId,
|
Building: building,
|
Floor: floor,
|
OrgId: orgId,
|
AppearCount: appearCount,
|
}
|
|
records = append(records, record)
|
}
|
}
|
}
|
}
|
}
|
}
|
}
|
}
|
}
|
|
return records, nil
|
}
|
|
func domainToLocation(records []*LocationRecord) (map[string]*db.DomainUnit, error) {
|
if len(records) == 0 {
|
return nil, nil
|
}
|
domainIds := set.NewStringSet()
|
for _, record := range records {
|
domainIds.Add(record.CommunityId)
|
}
|
domains, err := service.GetUnitsMapByIds(domainIds.Elements())
|
if err != nil {
|
return nil, err
|
}
|
return domains, nil
|
}
|