From 0dbe814f18cf27890694b09796cbd3822a7a0426 Mon Sep 17 00:00:00 2001 From: sunty <1172534965@qq.com> Date: 星期三, 27 三月 2024 18:23:22 +0800 Subject: [PATCH] update comid string to comid[] string DayNightActivityQuery add AnalyzeCoordinatedMovements --- EsApi.go | 349 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 files changed, 336 insertions(+), 13 deletions(-) diff --git a/EsApi.go b/EsApi.go index 65c6766..ea3fc93 100644 --- a/EsApi.go +++ b/EsApi.go @@ -48,11 +48,11 @@ endHourInt, _ := strconv.Atoi(endParts[0]) // 杈撳嚭寮�濮嬫椂闂寸殑灏忔椂 - fmt.Println("寮�濮嬫椂闂寸殑灏忔椂:", startHourInt) + //fmt.Println("寮�濮嬫椂闂寸殑灏忔椂:", startHourInt) // 杈撳嚭缁撴潫鏃堕棿鐨勫皬鏃� + 1 endHourPlusOne := (endHourInt + 1) % 24 // 鍙栦綑纭繚涓嶈秴杩�24灏忔椂 - fmt.Println("缁撴潫鏃堕棿鐨勫皬鏃� + 1:", endHourPlusOne) + //fmt.Println("缁撴潫鏃堕棿鐨勫皬鏃� + 1:", endHourPlusOne) activeHourFormat := activeHourFormat{startTime: startHour, endTime: endHour, startHour: startHourInt, endHour: endHourPlusOne} return activeHourFormat, nil } @@ -282,7 +282,7 @@ return docInfo, nil } -func DayNightActivityQuery(communityId string, documentNumber string,startTime string, endTime string, activeHour string, frequency int, +func DayNightActivityQuery(comIds []string, docNumber string, startTime string, endTime string, activeHour string, frequency int, intervalInMinutes int, indexName string, serverIp string, serverPort string) (map[string]interface{}, error) { esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" @@ -291,8 +291,13 @@ return nil, err } filterDocIdAttr := "" - if documentNumber != ""{ - filterDocIdAttr = "{\"term\": {\""+documentNumber+"\": \"\"}}," + if docNumber != "" { + filterDocIdAttr = "{\"term\": {\"documentNumber\": \"" + docNumber + "\"}}," + } + comIdsStr := "" + if comIds == nil || len(comIds) > 0 { + esComIds := strings.Replace(strings.Trim(fmt.Sprint(comIds), "[]"), " ", "\",\"", -1) + comIdsStr = "{\"terms\":{\"communityId\":[\"" + esComIds + "\"]}}," } queryDSL := ` { @@ -308,12 +313,8 @@ } } }, - `+filterDocIdAttr+` - { - "term": { - "communityId": "` + communityId + `" - } - }, + ` + filterDocIdAttr + ` + ` + comIdsStr + ` { "script": { "script": { @@ -377,8 +378,8 @@ } //result, _ := decodeDocumentInfos(source) //return result, nil - if len(docResult) ==0 { - return result,nil + if len(docResult) == 0 { + return result, nil } DataInfos, err := GetInfosByIds(docResult[0]["id"].([]string), indexName, serverIp, serverPort) result["documentNumbers"] = docResult @@ -386,6 +387,328 @@ return result, nil } +type acmInfo struct { + documentNumber string + camerasInfos []camerasInfo +} + +type camerasInfo struct { + cameraId string + captureInfos []captureInfo +} + +type captureInfo struct { + id string + picDate string +} + +func addSecondsToTimestamp(timestamp string, seconds int) (string, error) { + parsedTime, err := time.Parse("2006-01-02 15:04:05", timestamp) + if err != nil { + return "", err + } + newTime := parsedTime.Add(time.Second * time.Duration(seconds)) + newTimestamp := newTime.Format("2006-01-02 15:04:05") + return newTimestamp, nil +} + +func decodeInfo(intervalInMinutes int, source []map[string]interface{}) ([]acmInfo, error) { + acmInfos := make([]acmInfo, 0) + for _, info := range source { + var aInfo acmInfo + documentNumber := info["key"].(string) + aInfo.documentNumber = documentNumber + groupByCameraId := info["group_by_cameraId"].(map[string]interface{}) + cameraBuckets := groupByCameraId["buckets"].([]interface{}) + for _, cameraInfo := range cameraBuckets { + var camsInfo camerasInfo + cInfo := cameraInfo.(map[string]interface{}) + cameraId := cInfo["key"].(string) + camsInfo.cameraId = cameraId + dataBuckets := cInfo["top_hits"].(map[string]interface{})["hits"].(map[string]interface{})["hits"].([]interface{}) + markTime := "" + for _, dataInfo := range dataBuckets { + var capInfo captureInfo + dInfo := dataInfo.(map[string]interface{}) + dSource := dInfo["_source"].(map[string]interface{}) + id := dSource["id"].(string) + picDate := dSource["picDate"].(string) + //addFlag := false + if markTime == "" { + //addFlag = true + markTime = picDate + } else { + if checkTimeDifference(markTime, picDate, intervalInMinutes) { + //fmt.Println(markTime, picDate) + markTime = picDate + continue + } + markTime = picDate + } + capInfo.id = id + capInfo.picDate = picDate + camsInfo.captureInfos = append(camsInfo.captureInfos, capInfo) + } + aInfo.camerasInfos = append(aInfo.camerasInfos, camsInfo) + } + acmInfos = append(acmInfos, aInfo) + } + return acmInfos, nil +} + +type addResultIds struct { + documentNumber string + unionIds []unionId +} + +type unionId struct { + baseId string + targetId string +} + +func addResultInfo(source []map[string]interface{}, targetAddResultIds *[]addResultIds, bId string) { + found := false + for _, info := range source { + documentNumber := info["key"].(string) + dataBuckets := info["top_hits"].(map[string]interface{})["hits"].(map[string]interface{})["hits"].([]interface{}) + id := dataBuckets[0].(map[string]interface{})["_source"].(map[string]interface{})["id"].(string) + //fmt.Println("documentNumber: ", documentNumber, "\tid: ", id) + for i, docInfo := range *targetAddResultIds { + if documentNumber == docInfo.documentNumber { + //fmt.Println("鏂版洿鏂�") + (*targetAddResultIds)[i].unionIds = append((*targetAddResultIds)[i].unionIds, unionId{baseId: bId, targetId: id}) + found = true + break + } + } + if !found { + //fmt.Println("鏂版坊鍔�") + var targetAddResultId addResultIds + targetAddResultId.documentNumber = documentNumber + targetAddResultId.unionIds = append(targetAddResultId.unionIds, unionId{baseId: bId, targetId: id}) + *targetAddResultIds = append(*targetAddResultIds, targetAddResultId) + } + + } +} + +func removeDuplicates(nums []string) []string { + result := make([]string, 0) + seen := make(map[string]bool) + + for _, num := range nums { + if !seen[num] { + result = append(result, num) + seen[num] = true + } + } + + return result +} + +func findAnalyzeCoordinatedMovementsInfos(infos []acmInfo, beforeTime int, afterTime int, frequency int, + indexName string, serverIp string, serverPort string) (map[string]interface{}, error) { + //baseAddResultIds := make([]addResultIds, 0) + targetAddResultIds := make([]addResultIds, 0) + esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" + for _, info := range infos { + for _, cInfo := range info.camerasInfos { + for _, pInfo := range cInfo.captureInfos { + gteDate, err := addSecondsToTimestamp(pInfo.picDate[:19], beforeTime) + if err != nil { + fmt.Println(err) + } + lteDate, err := addSecondsToTimestamp(pInfo.picDate[:19], afterTime) + if err != nil { + fmt.Println(err) + } + queryDSL := ` + { + "size": 0, + "query": { + "bool": { + "filter": [ + { + "range": { + "picDate": { + "gte": "` + gteDate + `", + "lte": "` + lteDate + `" + } + } + }, + { + "term": { + "cameraId": "` + cInfo.cameraId + `" + } + } + ] + } + }, + "aggs": { + "group_by_documentnumber": { + "terms": { + "field": "documentNumber", + "size": 100000 + }, + "aggs": { + "top_hits": { + "top_hits": { + "_source": [ + "id", + "cameraId", + "picDate" + ], + "size": 10000, + "sort": [ + { + "picDate": { + "order": "asc" + } + } + ] + } + } + } + } + } + }` + //fmt.Println(esURL, queryDSL) + buf, err := EsReq("POST", esURL, []byte(queryDSL)) + if err != nil { + return nil, err + } + source, err := SourceAggregationList(buf) + if err != nil { + return nil, err + } + //fmt.Println("pInfo.id: ", pInfo.id) + addResultInfo(source, &targetAddResultIds, pInfo.id) + //fmt.Println("targetAddResultIds: ", targetAddResultIds) + if err != nil { + return nil, err + } + //fmt.Println(source) + } + } + } + //fmt.Println("targetAddResultIds: ", targetAddResultIds) + baseIds := make([]string, 0) + targetIds := make([]string, 0) + for _, tAIdInfo := range targetAddResultIds { + if len(tAIdInfo.unionIds) >= frequency { + for _, unionId := range tAIdInfo.unionIds { + baseIds = append(baseIds, unionId.baseId) + targetIds = append(targetIds, unionId.targetId) + } + } + } + rdbaseIds := removeDuplicates(baseIds) + rdtargetIds := removeDuplicates(targetIds) + baseInfos, err := GetInfosByIds(rdbaseIds, indexName, serverIp, serverPort) + if err != nil{ + return nil, err + } + targetInfos, err := GetInfosByIds(rdtargetIds, indexName, serverIp, serverPort) + if err != nil{ + return nil, err + } + result := make(map[string]interface{}) + result["baseRecordInfo"] = baseInfos + result["targetRecordInfo"] = targetInfos + return result, nil +} + +func AnalyzeCoordinatedMovements(comIds []string, docNumber string, startDate string, endDate string, beforeTime int, afterTime int, frequency int, + intervalInMinutes int, indexName string, serverIp string, serverPort string) (map[string]interface{}, error) { + esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" + //鍒ゆ柇绀惧尯IDs + comIdsStr := "" + if comIds == nil || len(comIds) > 0 { + esComIds := strings.Replace(strings.Trim(fmt.Sprint(comIds), "[]"), " ", "\",\"", -1) + comIdsStr = "{\"terms\":{\"communityId\":[\"" + esComIds + "\"]}}," + } + queryDSL := ` + { + "size": 0, + "query": { + "bool": { + "filter": [ + { + "range": { + "picDate": { + "gte": "` + startDate + `", + "lte": "` + endDate + `" + } + } + }, + ` + comIdsStr + ` + { + "term": { + "documentNumber": "` + docNumber + `" + } + } + ] + } + }, + "aggs": { + "group_by_documentnumber": { + "terms": { + "field": "documentNumber", + "size": 100000 + }, + "aggs": { + "group_by_cameraId": { + "terms": { + "field": "cameraId", + "size": 10000 + }, + "aggs": { + "top_hits": { + "top_hits": { + "_source": [ + "id", + "cameraId", + "picDate" + ], + "size": 10000, + "sort": [ + { + "picDate": { + "order": "asc" + } + } + ] + } + } + } + } + } + } + } + }` + //fmt.Println(esURL) + //fmt.Println(queryDSL) + buf, err := EsReq("POST", esURL, []byte(queryDSL)) + if err != nil { + return nil, err + } + source, err := SourceAggregationList(buf) + if err != nil { + return nil, err + } + docResult, err := decodeInfo(intervalInMinutes, source) + if err != nil { + return nil, err + } + //fmt.Println(docResult) + result, err := findAnalyzeCoordinatedMovementsInfos(docResult, beforeTime, afterTime, frequency, indexName, serverIp, serverPort) + if err != nil { + return nil, err + } + //fmt.Println(result) + return result, nil +} + func GetInfosByIds(ids []string, indexName string, serverIp string, serverPort string) ([]map[string]interface{}, error) { captureIds := strings.Replace(strings.Trim(fmt.Sprint(ids), "[]"), " ", "\",\"", -1) esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" -- Gitblit v1.8.0