| | |
| | | endHourInt, _ := strconv.Atoi(endParts[0]) |
| | | |
| | | // 输出开始时间的小时 |
| | | fmt.Println("开始时间的小时:", startHourInt) |
| | | //fmt.Println("开始时间的小时:", startHourInt) |
| | | |
| | | // 输出结束时间的小时 + 1 |
| | | endHourPlusOne := (endHourInt + 1) % 24 // 取余确保不超过24小时 |
| | | fmt.Println("结束时间的小时 + 1:", endHourPlusOne) |
| | | //fmt.Println("结束时间的小时 + 1:", endHourPlusOne) |
| | | activeHourFormat := activeHourFormat{startTime: startHour, endTime: endHour, startHour: startHourInt, endHour: endHourPlusOne} |
| | | return activeHourFormat, nil |
| | | } |
| | |
| | | return docInfo, nil |
| | | } |
| | | |
| | | func DayNightActivityQuery(communityId string, documentNumber string,startTime string, endTime string, activeHour string, frequency int, |
| | | func DayNightActivityQuery(comIds []string, docNumber string, startTime string, endTime string, activeHour string, frequency int, |
| | | intervalInMinutes int, indexName string, serverIp string, serverPort string) (map[string]interface{}, error) { |
| | | esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | |
| | |
| | | return nil, err |
| | | } |
| | | filterDocIdAttr := "" |
| | | if documentNumber != ""{ |
| | | filterDocIdAttr = "{\"term\": {\""+documentNumber+"\": \"\"}}," |
| | | if docNumber != "" { |
| | | filterDocIdAttr = "{\"term\": {\"documentNumber\": \"" + docNumber + "\"}}," |
| | | } |
| | | comIdsStr := "" |
| | | if comIds == nil || len(comIds) > 0 { |
| | | esComIds := strings.Replace(strings.Trim(fmt.Sprint(comIds), "[]"), " ", "\",\"", -1) |
| | | comIdsStr = "{\"terms\":{\"communityId\":[\"" + esComIds + "\"]}}," |
| | | } |
| | | queryDSL := ` |
| | | { |
| | |
| | | } |
| | | } |
| | | }, |
| | | `+filterDocIdAttr+` |
| | | { |
| | | "term": { |
| | | "communityId": "` + communityId + `" |
| | | } |
| | | }, |
| | | ` + filterDocIdAttr + ` |
| | | ` + comIdsStr + ` |
| | | { |
| | | "script": { |
| | | "script": { |
| | |
| | | } |
| | | //result, _ := decodeDocumentInfos(source) |
| | | //return result, nil |
| | | if len(docResult) ==0 { |
| | | return result,nil |
| | | if len(docResult) == 0 { |
| | | return result, nil |
| | | } |
| | | DataInfos, err := GetInfosByIds(docResult[0]["id"].([]string), indexName, serverIp, serverPort) |
| | | result["documentNumbers"] = docResult |
| | |
| | | return result, nil |
| | | } |
| | | |
| | | type acmInfo struct { |
| | | documentNumber string |
| | | camerasInfos []camerasInfo |
| | | } |
| | | |
| | | type camerasInfo struct { |
| | | cameraId string |
| | | captureInfos []captureInfo |
| | | } |
| | | |
| | | type captureInfo struct { |
| | | id string |
| | | picDate string |
| | | } |
| | | |
| | | func addSecondsToTimestamp(timestamp string, seconds int) (string, error) { |
| | | parsedTime, err := time.Parse("2006-01-02 15:04:05", timestamp) |
| | | if err != nil { |
| | | return "", err |
| | | } |
| | | newTime := parsedTime.Add(time.Second * time.Duration(seconds)) |
| | | newTimestamp := newTime.Format("2006-01-02 15:04:05") |
| | | return newTimestamp, nil |
| | | } |
| | | |
| | | func decodeInfo(intervalInMinutes int, source []map[string]interface{}) ([]acmInfo, error) { |
| | | acmInfos := make([]acmInfo, 0) |
| | | for _, info := range source { |
| | | var aInfo acmInfo |
| | | documentNumber := info["key"].(string) |
| | | aInfo.documentNumber = documentNumber |
| | | groupByCameraId := info["group_by_cameraId"].(map[string]interface{}) |
| | | cameraBuckets := groupByCameraId["buckets"].([]interface{}) |
| | | for _, cameraInfo := range cameraBuckets { |
| | | var camsInfo camerasInfo |
| | | cInfo := cameraInfo.(map[string]interface{}) |
| | | cameraId := cInfo["key"].(string) |
| | | camsInfo.cameraId = cameraId |
| | | dataBuckets := cInfo["top_hits"].(map[string]interface{})["hits"].(map[string]interface{})["hits"].([]interface{}) |
| | | markTime := "" |
| | | for _, dataInfo := range dataBuckets { |
| | | var capInfo captureInfo |
| | | dInfo := dataInfo.(map[string]interface{}) |
| | | dSource := dInfo["_source"].(map[string]interface{}) |
| | | id := dSource["id"].(string) |
| | | picDate := dSource["picDate"].(string) |
| | | //addFlag := false |
| | | if markTime == "" { |
| | | //addFlag = true |
| | | markTime = picDate |
| | | } else { |
| | | if checkTimeDifference(markTime, picDate, intervalInMinutes) { |
| | | //fmt.Println(markTime, picDate) |
| | | markTime = picDate |
| | | continue |
| | | } |
| | | markTime = picDate |
| | | } |
| | | capInfo.id = id |
| | | capInfo.picDate = picDate |
| | | camsInfo.captureInfos = append(camsInfo.captureInfos, capInfo) |
| | | } |
| | | aInfo.camerasInfos = append(aInfo.camerasInfos, camsInfo) |
| | | } |
| | | acmInfos = append(acmInfos, aInfo) |
| | | } |
| | | return acmInfos, nil |
| | | } |
| | | |
| | | type addResultIds struct { |
| | | documentNumber string |
| | | unionIds []unionId |
| | | } |
| | | |
| | | type unionId struct { |
| | | baseId string |
| | | targetId string |
| | | } |
| | | |
| | | func addResultInfo(source []map[string]interface{}, targetAddResultIds *[]addResultIds, bId string) { |
| | | found := false |
| | | for _, info := range source { |
| | | documentNumber := info["key"].(string) |
| | | dataBuckets := info["top_hits"].(map[string]interface{})["hits"].(map[string]interface{})["hits"].([]interface{}) |
| | | id := dataBuckets[0].(map[string]interface{})["_source"].(map[string]interface{})["id"].(string) |
| | | //fmt.Println("documentNumber: ", documentNumber, "\tid: ", id) |
| | | for i, docInfo := range *targetAddResultIds { |
| | | if documentNumber == docInfo.documentNumber { |
| | | //fmt.Println("新更新") |
| | | (*targetAddResultIds)[i].unionIds = append((*targetAddResultIds)[i].unionIds, unionId{baseId: bId, targetId: id}) |
| | | found = true |
| | | break |
| | | } |
| | | } |
| | | if !found { |
| | | //fmt.Println("新添加") |
| | | var targetAddResultId addResultIds |
| | | targetAddResultId.documentNumber = documentNumber |
| | | targetAddResultId.unionIds = append(targetAddResultId.unionIds, unionId{baseId: bId, targetId: id}) |
| | | *targetAddResultIds = append(*targetAddResultIds, targetAddResultId) |
| | | } |
| | | |
| | | } |
| | | } |
| | | |
| | | func removeDuplicates(nums []string) []string { |
| | | result := make([]string, 0) |
| | | seen := make(map[string]bool) |
| | | |
| | | for _, num := range nums { |
| | | if !seen[num] { |
| | | result = append(result, num) |
| | | seen[num] = true |
| | | } |
| | | } |
| | | |
| | | return result |
| | | } |
| | | |
| | | func findAnalyzeCoordinatedMovementsInfos(infos []acmInfo, beforeTime int, afterTime int, frequency int, |
| | | indexName string, serverIp string, serverPort string) (map[string]interface{}, error) { |
| | | //baseAddResultIds := make([]addResultIds, 0) |
| | | targetAddResultIds := make([]addResultIds, 0) |
| | | esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | for _, info := range infos { |
| | | for _, cInfo := range info.camerasInfos { |
| | | for _, pInfo := range cInfo.captureInfos { |
| | | gteDate, err := addSecondsToTimestamp(pInfo.picDate[:19], beforeTime) |
| | | if err != nil { |
| | | fmt.Println(err) |
| | | } |
| | | lteDate, err := addSecondsToTimestamp(pInfo.picDate[:19], afterTime) |
| | | if err != nil { |
| | | fmt.Println(err) |
| | | } |
| | | queryDSL := ` |
| | | { |
| | | "size": 0, |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "range": { |
| | | "picDate": { |
| | | "gte": "` + gteDate + `", |
| | | "lte": "` + lteDate + `" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "term": { |
| | | "cameraId": "` + cInfo.cameraId + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "aggs": { |
| | | "group_by_documentnumber": { |
| | | "terms": { |
| | | "field": "documentNumber", |
| | | "size": 100000 |
| | | }, |
| | | "aggs": { |
| | | "top_hits": { |
| | | "top_hits": { |
| | | "_source": [ |
| | | "id", |
| | | "cameraId", |
| | | "picDate" |
| | | ], |
| | | "size": 10000, |
| | | "sort": [ |
| | | { |
| | | "picDate": { |
| | | "order": "asc" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | }` |
| | | //fmt.Println(esURL, queryDSL) |
| | | buf, err := EsReq("POST", esURL, []byte(queryDSL)) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | source, err := SourceAggregationList(buf) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | //fmt.Println("pInfo.id: ", pInfo.id) |
| | | addResultInfo(source, &targetAddResultIds, pInfo.id) |
| | | //fmt.Println("targetAddResultIds: ", targetAddResultIds) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | //fmt.Println(source) |
| | | } |
| | | } |
| | | } |
| | | //fmt.Println("targetAddResultIds: ", targetAddResultIds) |
| | | baseIds := make([]string, 0) |
| | | targetIds := make([]string, 0) |
| | | for _, tAIdInfo := range targetAddResultIds { |
| | | if len(tAIdInfo.unionIds) >= frequency { |
| | | for _, unionId := range tAIdInfo.unionIds { |
| | | baseIds = append(baseIds, unionId.baseId) |
| | | targetIds = append(targetIds, unionId.targetId) |
| | | } |
| | | } |
| | | } |
| | | rdbaseIds := removeDuplicates(baseIds) |
| | | rdtargetIds := removeDuplicates(targetIds) |
| | | baseInfos, err := GetInfosByIds(rdbaseIds, indexName, serverIp, serverPort) |
| | | if err != nil{ |
| | | return nil, err |
| | | } |
| | | targetInfos, err := GetInfosByIds(rdtargetIds, indexName, serverIp, serverPort) |
| | | if err != nil{ |
| | | return nil, err |
| | | } |
| | | result := make(map[string]interface{}) |
| | | result["baseRecordInfo"] = baseInfos |
| | | result["targetRecordInfo"] = targetInfos |
| | | return result, nil |
| | | } |
| | | |
| | | func AnalyzeCoordinatedMovements(comIds []string, docNumber string, startDate string, endDate string, beforeTime int, afterTime int, frequency int, |
| | | intervalInMinutes int, indexName string, serverIp string, serverPort string) (map[string]interface{}, error) { |
| | | esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | //判断社区IDs |
| | | comIdsStr := "" |
| | | if comIds == nil || len(comIds) > 0 { |
| | | esComIds := strings.Replace(strings.Trim(fmt.Sprint(comIds), "[]"), " ", "\",\"", -1) |
| | | comIdsStr = "{\"terms\":{\"communityId\":[\"" + esComIds + "\"]}}," |
| | | } |
| | | queryDSL := ` |
| | | { |
| | | "size": 0, |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "range": { |
| | | "picDate": { |
| | | "gte": "` + startDate + `", |
| | | "lte": "` + endDate + `" |
| | | } |
| | | } |
| | | }, |
| | | ` + comIdsStr + ` |
| | | { |
| | | "term": { |
| | | "documentNumber": "` + docNumber + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "aggs": { |
| | | "group_by_documentnumber": { |
| | | "terms": { |
| | | "field": "documentNumber", |
| | | "size": 100000 |
| | | }, |
| | | "aggs": { |
| | | "group_by_cameraId": { |
| | | "terms": { |
| | | "field": "cameraId", |
| | | "size": 10000 |
| | | }, |
| | | "aggs": { |
| | | "top_hits": { |
| | | "top_hits": { |
| | | "_source": [ |
| | | "id", |
| | | "cameraId", |
| | | "picDate" |
| | | ], |
| | | "size": 10000, |
| | | "sort": [ |
| | | { |
| | | "picDate": { |
| | | "order": "asc" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | }` |
| | | //fmt.Println(esURL) |
| | | //fmt.Println(queryDSL) |
| | | buf, err := EsReq("POST", esURL, []byte(queryDSL)) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | source, err := SourceAggregationList(buf) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | docResult, err := decodeInfo(intervalInMinutes, source) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | //fmt.Println(docResult) |
| | | result, err := findAnalyzeCoordinatedMovementsInfos(docResult, beforeTime, afterTime, frequency, indexName, serverIp, serverPort) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | //fmt.Println(result) |
| | | return result, nil |
| | | } |
| | | |
| | | func GetInfosByIds(ids []string, indexName string, serverIp string, serverPort string) ([]map[string]interface{}, error) { |
| | | captureIds := strings.Replace(strings.Trim(fmt.Sprint(ids), "[]"), " ", "\",\"", -1) |
| | | esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |