sunty
2024-03-27 0dbe814f18cf27890694b09796cbd3822a7a0426
update comid string to comid[] string DayNightActivityQuery add AnalyzeCoordinatedMovements
1个文件已修改
349 ■■■■■ 已修改文件
EsApi.go 349 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
EsApi.go
@@ -48,11 +48,11 @@
        endHourInt, _ := strconv.Atoi(endParts[0])
        // 输出开始时间的小时
        fmt.Println("开始时间的小时:", startHourInt)
        //fmt.Println("开始时间的小时:", startHourInt)
        // 输出结束时间的小时 + 1
        endHourPlusOne := (endHourInt + 1) % 24 // 取余确保不超过24小时
        fmt.Println("结束时间的小时 + 1:", endHourPlusOne)
        //fmt.Println("结束时间的小时 + 1:", endHourPlusOne)
        activeHourFormat := activeHourFormat{startTime: startHour, endTime: endHour, startHour: startHourInt, endHour: endHourPlusOne}
        return activeHourFormat, nil
    }
@@ -282,7 +282,7 @@
    return docInfo, nil
}
func DayNightActivityQuery(communityId string, documentNumber string,startTime string, endTime string, activeHour string, frequency int,
func DayNightActivityQuery(comIds []string, docNumber string, startTime string, endTime string, activeHour string, frequency int,
    intervalInMinutes int, indexName string, serverIp string, serverPort string) (map[string]interface{}, error) {
    esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
@@ -291,8 +291,13 @@
        return nil, err
    }
    filterDocIdAttr := ""
    if documentNumber != ""{
        filterDocIdAttr = "{\"term\": {\""+documentNumber+"\": \"\"}},"
    if docNumber != "" {
        filterDocIdAttr = "{\"term\": {\"documentNumber\": \"" + docNumber + "\"}},"
    }
    comIdsStr := ""
    if comIds == nil || len(comIds) > 0 {
        esComIds := strings.Replace(strings.Trim(fmt.Sprint(comIds), "[]"), " ", "\",\"", -1)
        comIdsStr = "{\"terms\":{\"communityId\":[\"" + esComIds + "\"]}},"
    }
    queryDSL := `
    {
@@ -308,12 +313,8 @@
                            }
                        }
                    },
                    `+filterDocIdAttr+`
                    {
                        "term": {
                            "communityId": "` + communityId + `"
                        }
                    },
                    ` + filterDocIdAttr + `
                    ` + comIdsStr + `
                    {
                        "script": {
                            "script": {
@@ -377,8 +378,8 @@
    }
    //result, _ := decodeDocumentInfos(source)
    //return result, nil
    if len(docResult) ==0 {
        return result,nil
    if len(docResult) == 0 {
        return result, nil
    }
    DataInfos, err := GetInfosByIds(docResult[0]["id"].([]string), indexName, serverIp, serverPort)
    result["documentNumbers"] = docResult
@@ -386,6 +387,328 @@
    return result, nil
}
type acmInfo struct {
    documentNumber string
    camerasInfos   []camerasInfo
}
type camerasInfo struct {
    cameraId     string
    captureInfos []captureInfo
}
type captureInfo struct {
    id      string
    picDate string
}
func addSecondsToTimestamp(timestamp string, seconds int) (string, error) {
    parsedTime, err := time.Parse("2006-01-02 15:04:05", timestamp)
    if err != nil {
        return "", err
    }
    newTime := parsedTime.Add(time.Second * time.Duration(seconds))
    newTimestamp := newTime.Format("2006-01-02 15:04:05")
    return newTimestamp, nil
}
func decodeInfo(intervalInMinutes int, source []map[string]interface{}) ([]acmInfo, error) {
    acmInfos := make([]acmInfo, 0)
    for _, info := range source {
        var aInfo acmInfo
        documentNumber := info["key"].(string)
        aInfo.documentNumber = documentNumber
        groupByCameraId := info["group_by_cameraId"].(map[string]interface{})
        cameraBuckets := groupByCameraId["buckets"].([]interface{})
        for _, cameraInfo := range cameraBuckets {
            var camsInfo camerasInfo
            cInfo := cameraInfo.(map[string]interface{})
            cameraId := cInfo["key"].(string)
            camsInfo.cameraId = cameraId
            dataBuckets := cInfo["top_hits"].(map[string]interface{})["hits"].(map[string]interface{})["hits"].([]interface{})
            markTime := ""
            for _, dataInfo := range dataBuckets {
                var capInfo captureInfo
                dInfo := dataInfo.(map[string]interface{})
                dSource := dInfo["_source"].(map[string]interface{})
                id := dSource["id"].(string)
                picDate := dSource["picDate"].(string)
                //addFlag := false
                if markTime == "" {
                    //addFlag = true
                    markTime = picDate
                } else {
                    if checkTimeDifference(markTime, picDate, intervalInMinutes) {
                        //fmt.Println(markTime, picDate)
                        markTime = picDate
                        continue
                    }
                    markTime = picDate
                }
                capInfo.id = id
                capInfo.picDate = picDate
                camsInfo.captureInfos = append(camsInfo.captureInfos, capInfo)
            }
            aInfo.camerasInfos = append(aInfo.camerasInfos, camsInfo)
        }
        acmInfos = append(acmInfos, aInfo)
    }
    return acmInfos, nil
}
type addResultIds struct {
    documentNumber string
    unionIds       []unionId
}
type unionId struct {
    baseId   string
    targetId string
}
func addResultInfo(source []map[string]interface{}, targetAddResultIds *[]addResultIds, bId string) {
    found := false
    for _, info := range source {
        documentNumber := info["key"].(string)
        dataBuckets := info["top_hits"].(map[string]interface{})["hits"].(map[string]interface{})["hits"].([]interface{})
        id := dataBuckets[0].(map[string]interface{})["_source"].(map[string]interface{})["id"].(string)
        //fmt.Println("documentNumber: ", documentNumber, "\tid: ", id)
        for i, docInfo := range *targetAddResultIds {
            if documentNumber == docInfo.documentNumber {
                //fmt.Println("新更新")
                (*targetAddResultIds)[i].unionIds = append((*targetAddResultIds)[i].unionIds, unionId{baseId: bId, targetId: id})
                found = true
                break
            }
        }
        if !found {
            //fmt.Println("新添加")
            var targetAddResultId addResultIds
            targetAddResultId.documentNumber = documentNumber
            targetAddResultId.unionIds = append(targetAddResultId.unionIds, unionId{baseId: bId, targetId: id})
            *targetAddResultIds = append(*targetAddResultIds, targetAddResultId)
        }
    }
}
func removeDuplicates(nums []string) []string {
    result := make([]string, 0)
    seen := make(map[string]bool)
    for _, num := range nums {
        if !seen[num] {
            result = append(result, num)
            seen[num] = true
        }
    }
    return result
}
func findAnalyzeCoordinatedMovementsInfos(infos []acmInfo, beforeTime int, afterTime int, frequency int,
    indexName string, serverIp string, serverPort string) (map[string]interface{}, error) {
    //baseAddResultIds := make([]addResultIds, 0)
    targetAddResultIds := make([]addResultIds, 0)
    esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
    for _, info := range infos {
        for _, cInfo := range info.camerasInfos {
            for _, pInfo := range cInfo.captureInfos {
                gteDate, err := addSecondsToTimestamp(pInfo.picDate[:19], beforeTime)
                if err != nil {
                    fmt.Println(err)
                }
                lteDate, err := addSecondsToTimestamp(pInfo.picDate[:19], afterTime)
                if err != nil {
                    fmt.Println(err)
                }
                queryDSL := `
                    {
                        "size": 0,
                        "query": {
                            "bool": {
                                "filter": [
                                    {
                                        "range": {
                                            "picDate": {
                                                "gte": "` + gteDate + `",
                                                "lte": "` + lteDate + `"
                                            }
                                        }
                                    },
                                    {
                                        "term": {
                                            "cameraId": "` + cInfo.cameraId + `"
                                        }
                                    }
                                ]
                            }
                        },
                        "aggs": {
                            "group_by_documentnumber": {
                                "terms": {
                                    "field": "documentNumber",
                                    "size": 100000
                                },
                                "aggs": {
                                    "top_hits": {
                                        "top_hits": {
                                            "_source": [
                                                "id",
                                                "cameraId",
                                                "picDate"
                                            ],
                                            "size": 10000,
                                            "sort": [
                                                {
                                                    "picDate": {
                                                        "order": "asc"
                                                    }
                                                }
                                            ]
                                        }
                                    }
                                }
                            }
                        }
                    }`
                //fmt.Println(esURL, queryDSL)
                buf, err := EsReq("POST", esURL, []byte(queryDSL))
                if err != nil {
                    return nil, err
                }
                source, err := SourceAggregationList(buf)
                if err != nil {
                    return nil, err
                }
                //fmt.Println("pInfo.id: ", pInfo.id)
                addResultInfo(source, &targetAddResultIds, pInfo.id)
                //fmt.Println("targetAddResultIds: ", targetAddResultIds)
                if err != nil {
                    return nil, err
                }
                //fmt.Println(source)
            }
        }
    }
    //fmt.Println("targetAddResultIds: ", targetAddResultIds)
    baseIds := make([]string, 0)
    targetIds := make([]string, 0)
    for _, tAIdInfo := range targetAddResultIds {
        if len(tAIdInfo.unionIds) >= frequency {
            for _, unionId := range tAIdInfo.unionIds {
                baseIds = append(baseIds, unionId.baseId)
                targetIds = append(targetIds, unionId.targetId)
            }
        }
    }
    rdbaseIds := removeDuplicates(baseIds)
    rdtargetIds := removeDuplicates(targetIds)
    baseInfos, err := GetInfosByIds(rdbaseIds, indexName, serverIp, serverPort)
    if err != nil{
        return nil, err
    }
    targetInfos, err := GetInfosByIds(rdtargetIds, indexName, serverIp, serverPort)
    if err != nil{
        return nil, err
    }
    result := make(map[string]interface{})
    result["baseRecordInfo"] = baseInfos
    result["targetRecordInfo"] = targetInfos
    return result, nil
}
func AnalyzeCoordinatedMovements(comIds []string, docNumber string, startDate string, endDate string, beforeTime int, afterTime int, frequency int,
    intervalInMinutes int, indexName string, serverIp string, serverPort string) (map[string]interface{}, error) {
    esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
    //判断社区IDs
    comIdsStr := ""
    if comIds == nil || len(comIds) > 0 {
        esComIds := strings.Replace(strings.Trim(fmt.Sprint(comIds), "[]"), " ", "\",\"", -1)
        comIdsStr = "{\"terms\":{\"communityId\":[\"" + esComIds + "\"]}},"
    }
    queryDSL := `
    {
        "size": 0,
        "query": {
            "bool": {
                "filter": [
                    {
                        "range": {
                            "picDate": {
                                "gte": "` + startDate + `",
                                "lte": "` + endDate + `"
                            }
                        }
                    },
                    ` + comIdsStr + `
                    {
                        "term": {
                            "documentNumber": "` + docNumber + `"
                        }
                    }
                ]
            }
        },
        "aggs": {
            "group_by_documentnumber": {
                "terms": {
                    "field": "documentNumber",
                    "size": 100000
                },
                "aggs": {
                    "group_by_cameraId": {
                        "terms": {
                            "field": "cameraId",
                            "size": 10000
                        },
                        "aggs": {
                            "top_hits": {
                                "top_hits": {
                                    "_source": [
                                        "id",
                                        "cameraId",
                                        "picDate"
                                    ],
                                    "size": 10000,
                                    "sort": [
                                        {
                                            "picDate": {
                                                "order": "asc"
                                            }
                                        }
                                    ]
                                }
                            }
                        }
                    }
                }
            }
        }
    }`
    //fmt.Println(esURL)
    //fmt.Println(queryDSL)
    buf, err := EsReq("POST", esURL, []byte(queryDSL))
    if err != nil {
        return nil, err
    }
    source, err := SourceAggregationList(buf)
    if err != nil {
        return nil, err
    }
    docResult, err := decodeInfo(intervalInMinutes, source)
    if err != nil {
        return nil, err
    }
    //fmt.Println(docResult)
    result, err := findAnalyzeCoordinatedMovementsInfos(docResult, beforeTime, afterTime, frequency, indexName, serverIp, serverPort)
    if err != nil {
        return nil, err
    }
    //fmt.Println(result)
    return result, nil
}
func GetInfosByIds(ids []string, indexName string, serverIp string, serverPort string) ([]map[string]interface{}, error) {
    captureIds := strings.Replace(strings.Trim(fmt.Sprint(ids), "[]"), " ", "\",\"", -1)
    esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"