package models import ( "context" "fmt" "math" "sort" "strconv" "strings" "time" "github.com/milvus-io/milvus-sdk-go/v2/entity" ) // DataOperator 数据操作相关方法 type DataOperator struct { client *MilvusClient } // NewDataOperator 创建数据操作器 func NewDataOperator(client *MilvusClient) *DataOperator { return &DataOperator{client: client} } type PicWidHei struct { PicW int `json:"picW"` PicH int `json:"picH"` } // 统计查询 type StaticRecord struct { Id string `json:"id"` TaskId []int64 `json:"task_id"` RuleId []int64 `json:"rule_id"` EventLevelId []int64 `json:"event_level_id"` VideoPointId int64 `json:"video_point_id"` DetectId []int64 `json:"detect_id"` VideoName string `json:"video_name"` TaskNames []TaskOption `json:"task_names"` CheckNames []CheckOption `json:"check_names"` RuleNames []RuleOption `json:"rule_names"` EventLevels []DictOption `json:"event_levels"` KnowledgeDocuments []KnowledgeDocumentOption `json:"knowledge_documents"` ImagePath string `json:"image_path"` VideoPath string `json:"video_path"` DetectTime string `json:"detect_time"` IsWarning int64 `json:"is_warning"` ZhDescClass string `json:"zh_desc_class"` TaskName string `json:"task_name"` EventLevelName string `json:"event_level_name"` DetectNum int64 `json:"detect_num"` Suggestion string `json:"suggestion"` RiskDescription string `json:"risk_description"` KnowledgeId []int64 `json:"knowledge_id"` IsDesc int64 `json:"is_desc"` CameraId string `json:"cameraId"` CameraName string `json:"cameraName"` CameraAddr string `json:"cameraAddr"` PicDate string `json:"picDate"` PicId int64 `json:"picId"` PicMaxUrl []string `json:"picMaxUrl"` PicSrcUrl []string `json:"picSrcUrl"` PicWH PicWidHei `json:"picWH"` SdkName string `json:"sdkName"` Content string `json:"content"` AlarmRules []AlarmRule `json:"alarmRules"` LikeDate string `json:"likeDate"` ShowLabels string `json:"showLabels"` OtherLabels string `json:"otherLabels"` VideoUrl string `json:"videoUrl"` AnalyServerId string `json:"analyServerId"` AnalyServerName string `json:"analyServerName"` AnalyServerIp string `json:"analyServerIp"` ClusterId string `json:"clusterId"` IsAlarm bool `json:"isAlarm"` IsAckAlarm bool `json:"isAckAlarm"` IsCollect bool `json:"isCollect"` IsDelete bool `json:"isDelete"` BaseInfo []*BaseCompareInfo `json:"baseInfo"` TargetInfo []TargetInfo `json:"targetInfo"` FileId string `json:"fileId"` // 数据栈文件id DataSource string `json:"dataSource"` // 数据来源:摄像机, 数据栈 } type Point struct { X float64 `json:"x"` Y float64 `json:"y"` } type Points struct { TopLeft Point `json:"topLeft"` BottomRight Point `json:"bottomRight"` } type BaseCompareInfo struct { TableId string `json:"tableId"` TableName string `json:"tableName"` BwType string `json:"bwType"` CompareScore float64 `json:"compareScore"` TargetId string `json:"targetId"` TargetName string `json:"targetName"` TargetPicUrl string `json:"targetPicUrl"` MonitorLevel string `json:"monitorLevel"` Content string `json:"content"` DbLabel string `json:"labels"` } type TargetInfo struct { TargetId string `json:"targetId"` AreaId string `json:"areaId"` AreaName string `json:"areaName"` TargetScore float64 `json:"targetScore"` TargetType string `json:"targetType"` Feature string `json:"feature"` PicSmUrl string `json:"picSmUrl"` TargetLocation Points `json:"targetLocation"` BelongsTargetID string `json:"belongsTargetId"` Attribute string `json:"attribute"` } type AlarmRule struct { AlarmLevel string `db:"alarmLevel" json:"alarmLevel"` DefenceState bool `db:"defenceState" json:"defenceState"` GroupId string `db:"groupId" json:"groupId"` LinkInfo string `db:"linkInfo" json:"linkInfo"` RuleText string `db:"ruleText" json:"ruleText"` } type TaskOption struct { TaskId int64 `db:"task_id" json:"taskId"` TaskName string `db:"task_name" json:"taskName"` } type CheckOption struct { TaskId int64 `db:"task_id" json:"taskId,omitempty"` CheckId int64 `db:"check_id" json:"checkId"` FileName string `db:"file_name" json:"fileName"` } type RuleOption struct { TaskId int64 `db:"task_id" json:"taskId,omitempty"` RuleId int64 `db:"rule_id" json:"ruleId"` FileName string `db:"file_name" json:"fileName"` } // 字典 type DictOption struct { DictId int64 `db:"dict_id" json:"dictId"` DictName string `db:"dict_name" json:"dictName"` DictValue string `db:"dict_value" json:"dictValue"` DictType string `db:"dict_type" json:"dictType"` } // 知识库 type KnowledgeDocumentOption struct { Id string `db:"id" json:"id"` Title string `db:"title" json:"title"` KnowId int64 `db:"know_id" json:"know_id"` FileUrl string `db:"title" json:"file_url"` } // Pagination 分页信息 type Pagination struct { Page int `json:"page"` // 当前页码 PageSize int `json:"pageSize"` // 每页大小 Total int `json:"total"` // 总记录数 TotalPage int `json:"totalPage"` // 总页数 } // PaginatedResult 分页结果 type PaginatedResult struct { Items []StaticRecord `json:"list"` Pagination Pagination `json:"pagination"` } type PaginatedResult2 struct { Items map[string]interface{} `json:"list"` Pagination Pagination `json:"pagination"` } // 记录 type RecordReq struct { Ids []string `json:"ids"` VideoIds []string `json:"treeNodes"` TaskIds []int64 `json:"taskIds` Warning int64 `json:"warning` Page int64 `json:"page"` PageSize int64 `json:"pageSize"` } // 根据分页取数据 func GetWithPage(collectionName string, pageNum int64, pageSize int64, filter string) (*PaginatedResult, error) { if pageNum < 1 { pageNum = 1 } if pageSize < 1 { pageSize = 10 } if dbClient == nil { return nil, nil } dataOp := &DataOperator{client: dbClient} fmt.Println(filter) records, err := dataOp.queryWithPagination(collectionName, filter, pageNum, pageSize) if err != nil { return nil, err } return records, nil } // 转换数据 func convertResultToMap(result []entity.Column) []StaticRecord { if len(result) == 0 { return nil } count := result[0].Len() var records []StaticRecord for i := 0; i < count; i++ { record := StaticRecord{} for _, field := range result { switch field.Name() { case "id": IdColumn := field.(*entity.ColumnInt64).Data() if len(IdColumn) > 0 { record.Id = strconv.FormatInt(IdColumn[i], 10) } case "task_id": TaskIdColumn := field.(*entity.ColumnInt64Array).Data() if len(TaskIdColumn) > 0 { record.TaskId = TaskIdColumn[i] } case "rule_id": RuleIdColumn := field.(*entity.ColumnInt64Array).Data() if len(RuleIdColumn) > 0 { record.RuleId = RuleIdColumn[i] } case "event_level_id": EventLevelIdColumn := field.(*entity.ColumnInt64Array).Data() if len(EventLevelIdColumn) > 0 { record.EventLevelId = EventLevelIdColumn[i] } case "video_point_id": VideoPointIdColumn := field.(*entity.ColumnInt64).Data() if len(VideoPointIdColumn) > 0 { record.VideoPointId = VideoPointIdColumn[i] } case "detect_id": DetectIdColumn := field.(*entity.ColumnInt64Array).Data() if len(DetectIdColumn) > 0 { record.DetectId = DetectIdColumn[i] } case "image_path": ImagePathColumn := field.(*entity.ColumnVarChar).Data() if len(ImagePathColumn) > 0 { record.ImagePath = ImagePathColumn[i] } case "video_path": VideoPathColumn := field.(*entity.ColumnVarChar).Data() if len(VideoPathColumn) > 0 { record.VideoPath = VideoPathColumn[i] } case "zh_desc_class": zhColumn := field.(*entity.ColumnVarChar).Data() if len(zhColumn) > 0 { record.ZhDescClass = zhColumn[i] } case "task_name": TNColumn := field.(*entity.ColumnVarChar).Data() if len(TNColumn) > 0 { record.TaskName = TNColumn[i] } case "event_level_name": EVColumn := field.(*entity.ColumnVarChar).Data() if len(EVColumn) > 0 { record.EventLevelName = EVColumn[i] } case "is_desc": descColumn := field.(*entity.ColumnInt64).Data() if len(descColumn) > 0 { record.IsDesc = descColumn[i] } case "detect_num": DEColumn := field.(*entity.ColumnInt64).Data() if len(DEColumn) > 0 { record.DetectNum = DEColumn[i] } case "is_waning": warnColumn := field.(*entity.ColumnInt64).Data() if len(warnColumn) > 0 { record.IsWarning = warnColumn[i] } case "detect_time": DetectTimeColumn := field.(*entity.ColumnVarChar).Data() if len(DetectTimeColumn) > 0 { record.DetectTime = DetectTimeColumn[i] } case "knowledge_id": KnowledgeIdColumn := field.(*entity.ColumnInt64Array).Data() if len(KnowledgeIdColumn) > 0 { record.KnowledgeId = KnowledgeIdColumn[i] } case "suggestion": SuggestionColumn := field.(*entity.ColumnVarChar).Data() if len(SuggestionColumn) > 0 { record.Suggestion = SuggestionColumn[i] } case "risk_description": RiskDescriptionColumn := field.(*entity.ColumnVarChar).Data() if len(RiskDescriptionColumn) > 0 { record.RiskDescription = RiskDescriptionColumn[i] } } } records = append(records, record) } return records } // 分页查询 func (do *DataOperator) queryWithPagination(collectionName string, filterExpr string, pageNum int64, pageSize int64) (*PaginatedResult, error) { ctx := context.Background() // 计算偏移量 //offset := (pageNum - 1) * pageSize // 构建查询表达式(Milvus 2.1+ 支持 limit/offset) //queryExpr := fmt.Sprintf("%s limit %d offset %d", filterExpr, pageSize, offset) total, err := do.getTotalCount(collectionName, filterExpr) if err != nil { return nil, err } // 计算总页数 totalPages := int(math.Ceil(float64(total) / float64(pageSize))) // 执行查询 result, err := do.client.client.Query( ctx, collectionName, []string{}, // 所有分区 filterExpr, []string{"rule_id", "task_id", "is_waning", "zh_desc_class", "task_name", "event_level_name", "detect_num", "event_level_id", "video_point_id", "detect_id", "image_path", "video_path", "detect_time", "knowledge_id", "risk_description", "suggestion", "id", "is_desc"}, // 返回所有字段 // client.WithLimit(pageSize), // client.WithOffset(offset), ) if err != nil { return nil, fmt.Errorf("分页查询失败: %v", err) } lists := convertResultToMap(result) layout := "2006-01-02 15:04:05.999999" var temp1 time.Time var temp2 time.Time //排序 sort.Slice(lists, func(i, j int) bool { //return lists[i].DetectTime > lists[j].:DetectTime temp1, _ = time.Parse(layout, lists[i].DetectTime) temp2, _ = time.Parse(layout, lists[j].DetectTime) return temp1.After(temp2) }) items := Paginate(lists, int(pageNum), int(pageSize)) return &PaginatedResult{ Items: items, Pagination: Pagination{ Page: int(pageNum), PageSize: int(pageSize), Total: int(total), TotalPage: totalPages, }, }, nil } // 分页数据 func Paginate(data []StaticRecord, page, pageSize int) []StaticRecord { start := (page - 1) * pageSize if start >= len(data) { return nil } end := start + pageSize if end > len(data) { end = len(data) } return data[start:end] } // 统计数量 func (do *DataOperator) getTotalCount(collectionName string, filterExpr string) (int64, error) { result, err := do.client.client.Query( context.Background(), collectionName, []string{}, filterExpr, []string{"count(*)"}, ) if err != nil { return 0, err } return result[0].(*entity.ColumnInt64).Data()[0], nil } // 根据id获取任务 func GetTaskByIds(ids []int64) (items []TaskOption, err error) { placeholders := make([]string, len(ids)) args := make([]interface{}, len(ids)) for i, id := range ids { placeholders[i] = "?" args[i] = id } sqlStr := `select task_id,task_name from mal_smart_task where task_id in (` + strings.Join(placeholders, ",") + `)` if err := db.Raw(sqlStr).Scan(&items).Error; err != nil { return nil, err } return } // 根据id获取视频 func GetVideoById(vid int64) (video Camera, err error) { // 如果返回的是指针,需要初始化 sqlStr := "select id,name,alias,type,addr,rtsp,is_running,run_type,run_enable FROM cameras where video_id=?" if err := db.Raw(sqlStr, vid).Scan(&video).Error; err != nil { return Camera{}, err } return } // 根据id获取检测内容 func GetCheckByIds(ids []int64) (items []CheckOption, err error) { placeholders := make([]string, len(ids)) args := make([]interface{}, len(ids)) for i, id := range ids { placeholders[i] = "?" args[i] = id } sqlStr := `select check_id,file_name from mal_check_content where check_id in (` + strings.Join(placeholders, ",") + `)` if err := db.Raw(sqlStr, args...).Scan(&items).Error; err != nil { return nil, err } return } // 根据id获取规则 func GetRuleByIds(ids []int64) (items []RuleOption, err error) { placeholders := make([]string, len(ids)) args := make([]interface{}, len(ids)) for i, id := range ids { placeholders[i] = "?" args[i] = id } sqlStr := `select rule_id,file_name from mal_warning_rule where rule_id in (` + strings.Join(placeholders, ",") + `)` if err := db.Raw(sqlStr, args...).Scan(&items).Error; err != nil { return nil, err } return } // 获取事件 func GetEventByIds(ids []int64) (items []DictOption, err error) { placeholders := make([]string, len(ids)) args := make([]interface{}, len(ids)) for i, id := range ids { placeholders[i] = "?" args[i] = id } sqlStr := `select dict_id,dict_name,dict_value from mal_dict_type where dict_id in (` + strings.Join(placeholders, ",") + `)` if err := db.Raw(sqlStr, args...).Scan(&items).Error; err != nil { return nil, err } return } // 根据知识库id查数据 func GetKnowledgeDocumentByIds(ids []int64) (items []KnowledgeDocumentOption, err error) { placeholders := make([]string, len(ids)) args := make([]interface{}, len(ids)) for i, id := range ids { placeholders[i] = "?" args[i] = id } sqlStr := `SELECT id, know_id, file_name as title FROM mal_knowledge_document where id in (` + strings.Join(placeholders, ",") + `)` if err := db.Raw(sqlStr, args...).Scan(&items).Error; err != nil { return nil, err } return } // 根据知识库id查数据 func GetCameraIds(ids []string) (items []CameraDto, err error) { placeholders := make([]string, len(ids)) args := make([]interface{}, len(ids)) for i, id := range ids { placeholders[i] = "?" args[i] = id } sqlStr := `select id,name,alias,type,addr,rtsp,is_running,run_type,run_enable,video_id FROM cameras where id in (` + strings.Join(placeholders, ",") + `)` if err := db.Raw(sqlStr, args...).Scan(&items).Error; err != nil { return nil, err } return }