package service import ( "bytes" "context" "encoding/json" "fmt" "github.com/elastic/go-elasticsearch/v6" "model-engine/config" ) type CameraLocation struct { Building string `json:"building"` Unit string `json:"unit"` Pos string `json:"pos"` Floor string `json:"floor"` Direction string `json:"direction"` Latitude string `json:"latitude"` Longitude string `json:"longitude"` } func (cl *CameraLocation) Join() string { return cl.Building + cl.Unit + cl.Pos + cl.Floor } type ESRecordInfo struct { Id string `json:"id"` ClusterId string `json:"clusterId"` CameraId string `json:"cameraId"` CameraAddr string `json:"cameraAddr"` // 摄像机地址 CameraName string `json:"cameraName"` // 摄像机名称 CommunityId string `json:"communityId"` // 小区ID CommunityName string `json:"communityName"` // 小区名称 PersonName string `json:"personName"` // 姓名 IdCard string `json:"idCard"` // 证件号码 CameraLocation CameraLocation `json:"cameraLocation"` // 抓拍位置 DocumentNumber string `json:"documentNumber"` // 档案编号 OrgId string `json:"orgId"` // 机构ID OrgName string `json:"orgName"` // 机构名称 PicDate string `json:"picDate"` PicId string `json:"picId"` PicMaxUrl string `json:"picMaxUrl"` TaskId string `json:"taskId"` TaskName string `json:"taskName"` SdkName string `json:"sdkName"` Content string `json:"content"` ShowLabels string `json:"showLabels"` OtherLabels string `json:"otherLabels"` VideoUrl string `json:"videoUrl"` IsAlarm bool `json:"isAlarm"` IsAckAlarm bool `json:"isAckAlarm"` IsDelete bool `json:"isDelete"` IsKeyPerson bool `json:"isKeyPerson"` KeyPersonType []string `json:"keyPersonType"` DataSource string `json:"dataSource"` // 数据来源:摄像机, 数据栈 } func QueryEsRecord(esClient *elasticsearch.Client, startTime, endTime string, orgIds, areaIds []interface{}, documentNumbers []string) ([]*ESRecordInfo, error) { var buf bytes.Buffer // 构建过滤条件 var filters []map[string]interface{} documentNumberFilter := map[string]interface{}{ "terms": map[string]interface{}{ "documentNumber": documentNumbers, }, } filters = append(filters, documentNumberFilter) if len(orgIds) > 0 || len(areaIds) > 0 { // 获取数据权限过滤条件 authFilters := getDomainFilters(orgIds, areaIds) filters = append(filters, authFilters...) } // 时间范围 filters = append(filters, map[string]interface{}{ "range": map[string]interface{}{ "picDate": map[string]interface{}{ "gte": startTime, "lt": endTime, }, }, }) query := map[string]interface{}{ "query": map[string]interface{}{ "bool": map[string]interface{}{ "filter": filters, }, }, "size": 10000000, "sort": []interface{}{map[string]interface{}{"picDate": map[string]interface{}{"order": "asc"}}}, "_source": map[string]interface{}{"includes": []interface{}{}, "excludes": []interface{}{"*.feature", "targetInfo"}}, } if err := json.NewEncoder(&buf).Encode(query); err != nil { return nil, fmt.Errorf("error encoding query: %s", err) } res, err := esClient.Search( esClient.Search.WithContext(context.Background()), esClient.Search.WithIndex(config.EsInfo.EsIndex.AiOcean.IndexName), esClient.Search.WithDocumentType(config.EsInfo.EsIndex.AiOcean.IndexType), esClient.Search.WithBody(&buf), esClient.Search.WithTrackTotalHits(true), esClient.Search.WithPretty(), ) if err != nil { return nil, fmt.Errorf("error getting response: %s", err) } defer res.Body.Close() // Check for a successful status code (2xx range) if res.IsError() { return nil, fmt.Errorf("error getting response: %s", res.String()) } var result map[string]interface{} if err := json.NewDecoder(res.Body).Decode(&result); err != nil { return nil, fmt.Errorf("error parsing response body: %s", err) } if _, ok := result["hits"]; !ok || result["hits"] == nil { return nil, nil } // 解析查询结果 dat := result["hits"].(map[string]interface{}) var records = make([]*ESRecordInfo, len(dat["hits"].([]interface{}))) for idx, value := range dat["hits"].([]interface{}) { sourceData := value.(map[string]interface{}) source := sourceData["_source"].(map[string]interface{}) var record = ESRecordInfo{ Id: source["id"].(string), CameraId: source["cameraId"].(string), CameraAddr: source["cameraAddr"].(string), CameraName: source["cameraName"].(string), CommunityId: source["communityId"].(string), CommunityName: source["communityName"].(string), DocumentNumber: source["documentNumber"].(string), OrgId: source["orgId"].(string), OrgName: source["orgName"].(string), PicDate: source["picDate"].(string), PicId: source["picId"].(string), PicMaxUrl: source["picMaxUrl"].(string), } cameraLocation := source["cameraLocation"].(map[string]interface{}) record.CameraLocation = CameraLocation{ Building: cameraLocation["building"].(string), Unit: cameraLocation["unit"].(string), Pos: cameraLocation["pos"].(string), Floor: cameraLocation["floor"].(string), Direction: cameraLocation["direction"].(string), } records[idx] = &record } return records, nil } func getDomainFilters(orgIds, areaIds []interface{}) (filters []map[string]interface{}) { filters = make([]map[string]interface{}, 0) // 数据权限过滤 if len(orgIds) > 0 && len(areaIds) > 0 { var authParams = map[string]interface{}{ "bool": map[string]interface{}{ "should": []interface{}{ map[string]interface{}{ "terms": map[string]interface{}{ "orgId": orgIds, }}, map[string]interface{}{ "terms": map[string]interface{}{ "communityId": areaIds, }}, }, }, } filters = append(filters, authParams) } else if len(orgIds) > 0 { filters = append(filters, map[string]interface{}{ "terms": map[string]interface{}{ "orgId": orgIds, }, }) } else if len(areaIds) > 0 { filters = append(filters, map[string]interface{}{ "terms": map[string]interface{}{ "communityId": areaIds, }, }) } return filters }