New file |
| | |
| | | package db |
| | | |
| | | import ( |
| | | "fmt" |
| | | "gorm.io/gorm" |
| | | ) |
| | | |
| | | // KeyPersonBase 重点人员底库 |
| | | type KeyPersonBase struct { |
| | | BaseModel |
| | | Name string `json:"name" gorm:"type:varchar(255);"` //底库名称 |
| | | Color string `json:"color" gorm:"type:varchar(255);"` //底库颜色 |
| | | Level string `json:"level" gorm:"type:varchar(255);"` //底库等级 |
| | | TagName string `json:"tagName" gorm:"type:varchar(255)"` //标签名称 |
| | | TagType string `json:"tagType" gorm:"type:varchar(255)"` //标签类型 |
| | | } |
| | | |
| | | func (m *KeyPersonBase) TableName() string { |
| | | return "key_person_base" |
| | | } |
| | | |
| | | type KeyPersonBaseSearch struct { |
| | | KeyPersonBase |
| | | Orm *gorm.DB |
| | | PageNum int |
| | | PageSize int |
| | | Order string |
| | | TagTypes []string |
| | | } |
| | | |
| | | func NewKeyPersonBaseSearch() *KeyPersonBaseSearch { |
| | | return &KeyPersonBaseSearch{ |
| | | Orm: GetDB(), |
| | | PageNum: 1, |
| | | PageSize: 10, |
| | | } |
| | | } |
| | | func (slf *KeyPersonBaseSearch) SetPage(page, size int) *KeyPersonBaseSearch { |
| | | slf.PageNum, slf.PageSize = page, size |
| | | return slf |
| | | } |
| | | |
| | | func (slf *KeyPersonBaseSearch) SetOrder(order string) *KeyPersonBaseSearch { |
| | | slf.Order = order |
| | | return slf |
| | | } |
| | | |
| | | func (slf *KeyPersonBaseSearch) SetID(id string) *KeyPersonBaseSearch { |
| | | slf.ID = id |
| | | return slf |
| | | } |
| | | |
| | | func (slf *KeyPersonBaseSearch) Set(id string) *KeyPersonBaseSearch { |
| | | slf.ID = id |
| | | return slf |
| | | } |
| | | |
| | | func (slf *KeyPersonBaseSearch) SetTagTypes(ids []string) *KeyPersonBaseSearch { |
| | | slf.TagTypes = ids |
| | | return slf |
| | | } |
| | | |
| | | func (slf *KeyPersonBaseSearch) build() *gorm.DB { |
| | | var db = slf.Orm.Table(slf.TableName()) |
| | | if slf.Order != "" { |
| | | db = db.Order(slf.Order) |
| | | } |
| | | |
| | | if slf.ID != "" { |
| | | db = db.Where("id = ?", slf.ID) |
| | | } |
| | | |
| | | if slf.Name != "" { |
| | | db = db.Where("name = ?", slf.Name) |
| | | } |
| | | |
| | | if len(slf.TagTypes) != 0 { |
| | | db = db.Where("tag_type in ?", slf.TagTypes) |
| | | } |
| | | |
| | | return db |
| | | } |
| | | |
| | | func (slf *KeyPersonBaseSearch) First() (*KeyPersonBase, error) { |
| | | var ( |
| | | record = new(KeyPersonBase) |
| | | db = slf.build() |
| | | ) |
| | | |
| | | if err := db.First(record).Error; err != nil { |
| | | return record, err |
| | | } |
| | | |
| | | return record, nil |
| | | } |
| | | |
| | | func (slf *KeyPersonBaseSearch) Find() ([]*KeyPersonBase, int64, error) { |
| | | var ( |
| | | records = make([]*KeyPersonBase, 0) |
| | | total int64 |
| | | db = slf.build() |
| | | ) |
| | | |
| | | if err := db.Count(&total).Error; err != nil { |
| | | return records, total, fmt.Errorf("find count err: %v", err) |
| | | } |
| | | if slf.PageNum*slf.PageSize > 0 { |
| | | db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) |
| | | } |
| | | if err := db.Find(&records).Error; err != nil { |
| | | return records, total, fmt.Errorf("find records err: %v", err) |
| | | } |
| | | |
| | | return records, total, nil |
| | | } |
| | | |
| | | func (slf *KeyPersonBaseSearch) FindAll() ([]*KeyPersonBase, error) { |
| | | var ( |
| | | records = make([]*KeyPersonBase, 0) |
| | | db = slf.build() |
| | | ) |
| | | if err := db.Find(&records).Error; err != nil { |
| | | return records, fmt.Errorf("find records err: %v", err) |
| | | } |
| | | |
| | | return records, nil |
| | | } |
| | | |
| | | func (slf *KeyPersonBaseSearch) Count() int64 { |
| | | var ( |
| | | count int64 |
| | | db = slf.build() |
| | | ) |
| | | |
| | | if err := db.Count(&count).Error; err != nil { |
| | | return count |
| | | } |
| | | |
| | | return count |
| | | } |
| | | |
| | | func (slf *KeyPersonBaseSearch) Create(record *KeyPersonBase) error { |
| | | var db = slf.build() |
| | | |
| | | if err := db.Create(record).Error; err != nil { |
| | | return fmt.Errorf("create err: %v, record: %+v", err, record) |
| | | } |
| | | |
| | | return nil |
| | | } |
| | | |
| | | func (slf *KeyPersonBaseSearch) Update(record *KeyPersonBase) error { |
| | | var db = slf.build() |
| | | |
| | | if err := db.Updates(record).Error; err != nil { |
| | | return fmt.Errorf("update err: %v, record: %+v", err, record) |
| | | } |
| | | |
| | | return nil |
| | | } |
| | | |
| | | func (slf *KeyPersonBaseSearch) Delete() error { |
| | | var db = slf.build() |
| | | return db.Delete(&KeyPersonBase{}).Error |
| | | } |
New file |
| | |
| | | package db |
| | | |
| | | import ( |
| | | "fmt" |
| | | "gorm.io/gorm" |
| | | ) |
| | | |
| | | type ModelTaskResults struct { |
| | | BaseModel |
| | | Title string `json:"name" gorm:"type:varchar(255)"` //预警名称,暂时用任务名称 |
| | | Event string `json:"event" gorm:"type:varchar(255)"` //预警事件 |
| | | ModelID string `json:"modelID" gorm:"type:varchar(255)"` //模型ID |
| | | ModelTaskID string `json:"modelTaskID" gorm:"type:varchar(255)"` //模型任务ID |
| | | CommunityId string `json:"communityID" gorm:"index;column:community_id;type:varchar(299);not null;"` //小区ID |
| | | OrgID string `json:"orgID" gorm:"index;column:org_id;type:varchar(299);not null;"` //派出所 domain unit ID |
| | | ObjectIds string `json:"objectIds" gorm:"type:text"` //事件对象,可以是人,多个用逗号分隔 |
| | | Location string `json:"location" gorm:"type:varchar(255)"` //发生地点 |
| | | } |
| | | |
| | | func (m *ModelTaskResults) TableName() string { |
| | | return "model_task_results" |
| | | } |
| | | |
| | | type ModelTaskResultsSearch struct { |
| | | ModelTaskResults |
| | | Orm *gorm.DB |
| | | PageNum int |
| | | PageSize int |
| | | Order string |
| | | Keyword string |
| | | } |
| | | |
| | | func NewModelTaskResultsSearch() *ModelTaskResultsSearch { |
| | | return &ModelTaskResultsSearch{ |
| | | Orm: GetDB(), |
| | | PageNum: 1, |
| | | PageSize: 10, |
| | | } |
| | | } |
| | | |
| | | func (slf *ModelTaskResultsSearch) SetOrm(tx *gorm.DB) *ModelTaskResultsSearch { |
| | | slf.Orm = tx |
| | | return slf |
| | | } |
| | | |
| | | func (slf *ModelTaskResultsSearch) SetPage(page, size int) *ModelTaskResultsSearch { |
| | | slf.PageNum, slf.PageSize = page, size |
| | | return slf |
| | | } |
| | | |
| | | func (slf *ModelTaskResultsSearch) SetOrder(order string) *ModelTaskResultsSearch { |
| | | slf.Order = order |
| | | return slf |
| | | } |
| | | |
| | | func (slf *ModelTaskResultsSearch) SetID(id string) *ModelTaskResultsSearch { |
| | | slf.ID = id |
| | | return slf |
| | | } |
| | | |
| | | func (slf *ModelTaskResultsSearch) SetKeyword(kw string) *ModelTaskResultsSearch { |
| | | slf.Keyword = kw |
| | | return slf |
| | | } |
| | | |
| | | func (slf *ModelTaskResultsSearch) build() *gorm.DB { |
| | | var db = slf.Orm.Table(slf.TableName()) |
| | | if slf.Order != "" { |
| | | db = db.Order(slf.Order) |
| | | } |
| | | |
| | | if slf.ID != "" { |
| | | db = db.Where("id = ?", slf.ID) |
| | | } |
| | | |
| | | if slf.Keyword != "" { |
| | | kw := "%" + slf.Keyword + "%" |
| | | db = db.Where("name like ?", kw) |
| | | } |
| | | |
| | | return db |
| | | } |
| | | |
| | | func (slf *ModelTaskResultsSearch) First() (*ModelTaskResults, error) { |
| | | var ( |
| | | record = new(ModelTaskResults) |
| | | db = slf.build() |
| | | ) |
| | | |
| | | if err := db.First(record).Error; err != nil { |
| | | return record, err |
| | | } |
| | | |
| | | return record, nil |
| | | } |
| | | |
| | | func (slf *ModelTaskResultsSearch) Find() ([]*ModelTaskResults, int64, error) { |
| | | var ( |
| | | records = make([]*ModelTaskResults, 0) |
| | | total int64 |
| | | db = slf.build() |
| | | ) |
| | | |
| | | if err := db.Count(&total).Error; err != nil { |
| | | return records, total, fmt.Errorf("find count err: %v", err) |
| | | } |
| | | if slf.PageNum*slf.PageSize > 0 { |
| | | db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) |
| | | } |
| | | if err := db.Find(&records).Error; err != nil { |
| | | return records, total, fmt.Errorf("find records err: %v", err) |
| | | } |
| | | |
| | | return records, total, nil |
| | | } |
| | | |
| | | func (slf *ModelTaskResultsSearch) FindAll() ([]*ModelTaskResults, error) { |
| | | var ( |
| | | records = make([]*ModelTaskResults, 0) |
| | | db = slf.build() |
| | | ) |
| | | if err := db.Find(&records).Error; err != nil { |
| | | return records, fmt.Errorf("find records err: %v", err) |
| | | } |
| | | |
| | | return records, nil |
| | | } |
| | | |
| | | func (slf *ModelTaskResultsSearch) Count() int64 { |
| | | var ( |
| | | count int64 |
| | | db = slf.build() |
| | | ) |
| | | |
| | | if err := db.Count(&count).Error; err != nil { |
| | | return count |
| | | } |
| | | |
| | | return count |
| | | } |
| | | |
| | | func (slf *ModelTaskResultsSearch) Create(record *ModelTaskResults) error { |
| | | var db = slf.build() |
| | | |
| | | if err := db.Create(record).Error; err != nil { |
| | | return fmt.Errorf("create err: %v, record: %+v", err, record) |
| | | } |
| | | |
| | | return nil |
| | | } |
| | | |
| | | func (slf *ModelTaskResultsSearch) BatchCreate(record []*ModelTaskResults) error { |
| | | var db = slf.build() |
| | | |
| | | if err := db.Create(record).Error; err != nil { |
| | | return fmt.Errorf("create err: %v, record: %+v", err, record) |
| | | } |
| | | |
| | | return nil |
| | | } |
| | | |
| | | func (slf *ModelTaskResultsSearch) Save(record *ModelTaskResults) error { |
| | | var db = slf.build() |
| | | |
| | | if err := db.Omit("CreatedAt").Save(record).Error; err != nil { |
| | | return fmt.Errorf("save err: %v, record: %+v", err, record) |
| | | } |
| | | |
| | | return nil |
| | | } |
| | | |
| | | func (slf *ModelTaskResultsSearch) Update(record *ModelTaskResults) error { |
| | | var db = slf.build() |
| | | |
| | | if err := db.Updates(record).Error; err != nil { |
| | | return fmt.Errorf("update err: %v, record: %+v", err, record) |
| | | } |
| | | |
| | | return nil |
| | | } |
| | | |
| | | func (slf *ModelTaskResultsSearch) Delete() error { |
| | | var db = slf.build() |
| | | return db.Delete(&ModelTaskResults{}).Error |
| | | } |
| | |
| | | "log" |
| | | "model-engine/config" |
| | | "model-engine/db" |
| | | "model-engine/pkg/set" |
| | | "model-engine/service" |
| | | "strings" |
| | | "time" |
| | |
| | | AppearInterval int `gorm:"type:int;" json:"appearInterval"` //出现间隔,单位为秒 |
| | | DaysWindow int `gorm:"type:int;" json:"daysWindow" ` //近几天内 |
| | | Threshold int `gorm:"type:int;" json:"threshold" ` //达几次 |
| | | Task *db.ModelTask |
| | | } |
| | | |
| | | func (m *GatherModel) Init(task *db.ModelTask) error { |
| | |
| | | return err |
| | | } |
| | | |
| | | m.Task = task |
| | | m.OrgIds = orgIds |
| | | m.AreaIds = areaIds |
| | | m.Building = task.Building |
| | |
| | | PicDate string `json:"picDate"` |
| | | DocumentNumber string |
| | | CommunityId string `json:"communityId"` |
| | | OrgId string `json:"orgId"` |
| | | Building string `json:"building"` |
| | | Floor string `json:"floor"` |
| | | GatherPersons int `gorm:"type:int;" json:"gatherPersons"` //聚集人数 |
| | |
| | | log.Fatalf("Failed to analyze and aggregate data: %v", err) |
| | | } |
| | | |
| | | // Print or process the aggregation results as needed |
| | | for location, persons := range aggregation { |
| | | fmt.Printf("Gathering detected at %s with %d unique persons\n", location, len(persons)) |
| | | } |
| | | if len(aggregation) == 0 { |
| | | return nil |
| | | } |
| | | |
| | | tagTypes := strings.Split(m.Task.PersonType, ",") |
| | | results := make([]*db.ModelTaskResults, 0, len(aggregation)) |
| | | _, typeNames, err := service.GetPersonTypeNameByTypes(tagTypes) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | event := strings.Join(typeNames, ",") |
| | | for location, persons := range aggregation { |
| | | result := &db.ModelTaskResults{ |
| | | Title: m.Task.Name, |
| | | Event: fmt.Sprintf("%s/%d人", event, len(persons)), |
| | | ModelID: m.Task.ModelID, |
| | | ModelTaskID: m.Task.ID, |
| | | CommunityId: location.CommunityId, |
| | | OrgID: location.OrgId, |
| | | ObjectIds: strings.Join(persons.Elements(), ","), |
| | | Location: location.Location, |
| | | } |
| | | results = append(results, result) |
| | | } |
| | | return service.SaveTaskResults(results) |
| | | } |
| | | |
| | | func (m *GatherModel) Shutdown() error { |
| | |
| | | }, |
| | | }, |
| | | "aggs": map[string]interface{}{ |
| | | "gather_events": map[string]interface{}{ |
| | | "date_histogram": map[string]interface{}{ |
| | | "field": "picDate", |
| | | "interval": fmt.Sprintf("%ds", gatherModel.AppearInterval), |
| | | "min_doc_count": 1, |
| | | }, |
| | | "aggs": map[string]interface{}{ |
| | | "community": map[string]interface{}{ |
| | | "orgs": map[string]interface{}{ // 先聚合orgId |
| | | "terms": map[string]interface{}{ |
| | | "field": "communityId", // 聚合小区id |
| | | "field": "orgId", // 聚合orgId |
| | | "size": 10000, |
| | | }, |
| | | "aggs": map[string]interface{}{ |
| | | "location": map[string]interface{}{ |
| | | "community": map[string]interface{}{ // 在orgId聚合下聚合communityId |
| | | "terms": map[string]interface{}{ |
| | | "field": "communityId", // 聚合communityId |
| | | "size": 10000, |
| | | }, |
| | | "aggs": map[string]interface{}{ |
| | | "location": map[string]interface{}{ // 在communityId下聚合building |
| | | "terms": map[string]interface{}{ |
| | | "field": "cameraLocation.building", // 聚合楼栋 |
| | | "size": 10000, |
| | | }, |
| | | "aggs": map[string]interface{}{ |
| | | "floor": map[string]interface{}{ |
| | | "floor": map[string]interface{}{ // 在building下聚合floor |
| | | "terms": map[string]interface{}{ |
| | | "field": "cameraLocation.floor", // 聚合楼层 |
| | | "size": 10000, |
| | | }, |
| | | "aggs": map[string]interface{}{ |
| | | "gather_events": map[string]interface{}{ // 在floor下聚合gather_events |
| | | "date_histogram": map[string]interface{}{ |
| | | "field": "picDate", |
| | | "interval": fmt.Sprintf("%ds", gatherModel.AppearInterval), |
| | | "min_doc_count": 1, |
| | | }, |
| | | "aggs": map[string]interface{}{ |
| | | "people": map[string]interface{}{ |
| | |
| | | "source": "params.personCount >= params.gatherPersons", // 聚集人数过滤 |
| | | "params": map[string]interface{}{ |
| | | "gatherPersons": gatherModel.GatherPersons, |
| | | }, |
| | | }, |
| | | }, |
| | | }, |
| | | }, |
| | |
| | | // 解析聚合结果 |
| | | var records []GatherRecord |
| | | if aggs, ok := result["aggregations"].(map[string]interface{}); ok { |
| | | if gatherEvents, ok := aggs["gather_events"].(map[string]interface{}); ok { |
| | | if buckets, ok := gatherEvents["buckets"].([]interface{}); ok { |
| | | for _, bucket := range buckets { |
| | | key := int64(bucket.(map[string]interface{})["key"].(float64)) / 1000 // 将毫秒转换为秒 |
| | | timestamp := time.Unix(key, 0).Format("2006-01-02T15:04:05") |
| | | if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok { |
| | | for _, orgBucket := range orgBuckets { |
| | | orgId := orgBucket.(map[string]interface{})["key"].(string) |
| | | |
| | | // 解析按小区、楼栋和楼层的聚合结果 |
| | | if communityBuckets, ok := bucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok { |
| | | // 解析按communityId的聚合结果 |
| | | if communityBuckets, ok := orgBucket.(map[string]interface{})["community"].(map[string]interface{})["buckets"].([]interface{}); ok { |
| | | for _, communityBucket := range communityBuckets { |
| | | communityId := communityBucket.(map[string]interface{})["key"].(string) |
| | | |
| | | // 解析按楼栋和楼层的聚合结果 |
| | | // 解析按building的聚合结果 |
| | | if locationBuckets, ok := communityBucket.(map[string]interface{})["location"].(map[string]interface{})["buckets"].([]interface{}); ok { |
| | | for _, locationBucket := range locationBuckets { |
| | | building := locationBucket.(map[string]interface{})["key"].(string) |
| | | |
| | | // 解析楼层 |
| | | // 解析按floor的聚合结果 |
| | | if floorBuckets, ok := locationBucket.(map[string]interface{})["floor"].(map[string]interface{})["buckets"].([]interface{}); ok { |
| | | for _, floorBucket := range floorBuckets { |
| | | floor := floorBucket.(map[string]interface{})["key"].(string) |
| | | |
| | | // 解析聚合的事件 |
| | | if gatherEvents, ok := floorBucket.(map[string]interface{})["gather_events"].(map[string]interface{})["buckets"].([]interface{}); ok { |
| | | for _, eventBucket := range gatherEvents { |
| | | key := int64(eventBucket.(map[string]interface{})["key"].(float64)) / 1000 // 将毫秒转换为秒 |
| | | timestamp := time.Unix(key, 0).Format("2006-01-02T15:04:05") |
| | | |
| | | // 解析人员 |
| | | if peopleBuckets, ok := floorBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok { |
| | | if peopleBuckets, ok := eventBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok { |
| | | for _, person := range peopleBuckets { |
| | | documentNumber := person.(map[string]interface{})["key"].(string) |
| | | |
| | |
| | | CommunityId: communityId, |
| | | Building: building, |
| | | Floor: floor, |
| | | OrgId: orgId, |
| | | AppearInterval: gatherModel.AppearInterval, |
| | | GatherPersons: gatherModel.GatherPersons, |
| | | } |
| | |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | return records, nil |
| | | } |
| | | |
| | | func analyzeAndAggregate(records []GatherRecord) (map[string][]string, error) { |
| | | // Implement logic to aggregate and analyze data based on GatherModel parameters |
| | | // This is a placeholder for the actual implementation |
| | | aggregation := make(map[string][]string) |
| | | type GatherLocation struct { |
| | | CommunityId string |
| | | OrgId string |
| | | Building string |
| | | Floor string |
| | | Location string |
| | | } |
| | | |
| | | // Example logic: |
| | | func analyzeAndAggregate(records []GatherRecord) (map[GatherLocation]set.StringSet, error) { |
| | | aggregation := make(map[GatherLocation]set.StringSet) |
| | | domainIds := set.NewStringSet() |
| | | for _, record := range records { |
| | | key := fmt.Sprintf("%s%s%s", record.CommunityId, record.Building, record.Floor) |
| | | aggregation[key] = append(aggregation[key], record.DocumentNumber) |
| | | domainIds.Add(record.CommunityId) |
| | | } |
| | | |
| | | domains, err := service.GetUnitsMapByIds(domainIds.Elements()) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | |
| | | for _, record := range records { |
| | | if domains[record.CommunityId] == nil { |
| | | continue |
| | | } |
| | | |
| | | location := GatherLocation{ |
| | | CommunityId: record.CommunityId, |
| | | OrgId: record.OrgId, |
| | | Building: record.Building, |
| | | Floor: record.Floor, |
| | | Location: fmt.Sprintf("%s%s%s", domains[record.CommunityId].Name, record.Building, record.Floor), |
| | | } |
| | | if aggregation[location] == nil { |
| | | aggregation[location] = set.NewStringSet() |
| | | } |
| | | aggregation[location].Add(record.DocumentNumber) |
| | | } |
| | | |
| | | return aggregation, nil |
| | |
| | | return list, nil |
| | | |
| | | } |
| | | |
| | | func GetUnitsMapByIds(ids []string) (m map[string]*db.DomainUnit, err error) { |
| | | list := make([]*db.DomainUnit, 0) |
| | | err = db.GetDB().Model(&db.DomainUnit{}).Where("id in ?", ids).Find(&list).Error |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | m = make(map[string]*db.DomainUnit, len(list)) |
| | | for _, v := range list { |
| | | m[v.ID] = v |
| | | } |
| | | return |
| | | } |
New file |
| | |
| | | package service |
| | | |
| | | import "model-engine/db" |
| | | |
| | | func GetPersonTypeNameByTypes(types []string) (m map[string]string, names []string, err error) { |
| | | m = make(map[string]string) |
| | | |
| | | records, err := db.NewKeyPersonBaseSearch().SetTagTypes(types).FindAll() |
| | | if err != nil { |
| | | return |
| | | } |
| | | |
| | | for _, record := range records { |
| | | m[record.TagType] = record.TagName |
| | | } |
| | | |
| | | for _, t := range types { |
| | | names = append(names, m[t]) |
| | | } |
| | | |
| | | return |
| | | |
| | | } |
New file |
| | |
| | | package service |
| | | |
| | | import "model-engine/db" |
| | | |
| | | func SaveTaskResults(results []*db.ModelTaskResults) error { |
| | | return db.NewModelTaskResultsSearch().BatchCreate(results) |
| | | } |