| | |
| | | } |
| | | } |
| | | |
| | | //***********************重庆Start**********************************// |
| | | |
| | | type activeHourFormat struct { |
| | | startTime string |
| | | endTime string |
| | | startHour int |
| | | endHour int |
| | | } |
| | | |
| | | //按需求(activeHourFormat结构体)格式化时间数据 |
| | | func formatActiveHour(activeHour string) (activeHourFormat, error) { |
| | | hours := strings.Split(activeHour, "-") |
| | | |
| | | if len(hours) == 2 { |
| | | startHour := hours[0] |
| | | endHour := hours[1] |
| | | |
| | | // 解析开始时间的小时和分钟 |
| | | startParts := strings.Split(startHour, ":") |
| | | startHourInt, _ := strconv.Atoi(startParts[0]) |
| | | |
| | | // 解析结束时间的小时和分钟 |
| | | endParts := strings.Split(endHour, ":") |
| | | endHourInt, _ := strconv.Atoi(endParts[0]) |
| | | |
| | | // 输出开始时间的小时 |
| | | //fmt.Println("开始时间的小时:", startHourInt) |
| | | |
| | | // 输出结束时间的小时 + 1 |
| | | endHourPlusOne := (endHourInt + 1) % 24 // 取余确保不超过24小时 |
| | | //fmt.Println("结束时间的小时 + 1:", endHourPlusOne) |
| | | activeHourFormat := activeHourFormat{startTime: startHour, endTime: endHour, startHour: startHourInt, endHour: endHourPlusOne} |
| | | return activeHourFormat, nil |
| | | } |
| | | return activeHourFormat{}, errors.New("错误:无法解析开始时间和结束时间") |
| | | |
| | | } |
| | | |
| | | //判断时间是否再范围之内 |
| | | func isTimeInRange(timeStr, startStr, endStr string) bool { |
| | | layout := "15:04:05" |
| | | |
| | | timeStamp, err := time.Parse(layout, timeStr) |
| | | if err != nil { |
| | | fmt.Println("Error parsing timestamp:", err) |
| | | return false |
| | | } |
| | | |
| | | startTime, err := time.Parse(layout, startStr) |
| | | if err != nil { |
| | | fmt.Println("Error parsing start time:", err) |
| | | return false |
| | | } |
| | | |
| | | endTime, err := time.Parse(layout, endStr) |
| | | if err != nil { |
| | | fmt.Println("Error parsing end time:", err) |
| | | return false |
| | | } |
| | | |
| | | if startTime.After(endTime) { |
| | | // 跨越日期的情况 |
| | | //fmt.Println("跨日期",timeStamp, timeStamp.After(startTime), timeStamp.Before(endTime)) |
| | | return timeStamp.After(startTime) || timeStamp.Before(endTime) |
| | | } else { |
| | | // 不跨越日期的情况 |
| | | //fmt.Println("不跨日期",timeStamp, timeStamp.After(startTime), timeStamp.Before(endTime)) |
| | | return timeStamp.After(startTime) && timeStamp.Before(endTime) |
| | | } |
| | | } |
| | | |
| | | //判断两个时间先后 |
| | | func compareTimes(time1Str, time2Str string) int { |
| | | layout := "15:04:05" |
| | | |
| | | time1, err := time.Parse(layout, time1Str) |
| | | if err != nil { |
| | | fmt.Println("Error parsing time 1:", err) |
| | | return 0 |
| | | } |
| | | |
| | | time2, err := time.Parse(layout, time2Str) |
| | | if err != nil { |
| | | fmt.Println("Error parsing time 2:", err) |
| | | return 0 |
| | | } |
| | | |
| | | if time1.Before(time2) { |
| | | return -1 // time1 在 time2 之前 |
| | | } else if time1.After(time2) { |
| | | return 1 // time1 在 time2 之后 |
| | | } else { |
| | | return 0 // time1 和 time2 相等 |
| | | } |
| | | } |
| | | |
| | | //判断日期相差几天 |
| | | func daysBetweenDates(date1Str, date2Str string) int { |
| | | layout := "2006-01-02" |
| | | |
| | | date1, err := time.Parse(layout, date1Str) |
| | | if err != nil { |
| | | fmt.Println("Error parsing date 1:", err) |
| | | return 0 |
| | | } |
| | | |
| | | date2, err := time.Parse(layout, date2Str) |
| | | if err != nil { |
| | | fmt.Println("Error parsing date 2:", err) |
| | | return 0 |
| | | } |
| | | |
| | | duration := date2.Sub(date1) |
| | | days := int(duration.Hours() / 24) |
| | | |
| | | return days |
| | | } |
| | | |
| | | //计算时间阈值 |
| | | func checkTimeDifference(timestampStr1 string, timestampStr2 string, intervalInMinutes int) bool { |
| | | layout := "2006-01-02 15:04:05" |
| | | timestampStr1 = timestampStr1[:19] |
| | | timestampStr2 = timestampStr2[:19] |
| | | // 将字符串解析为时间 |
| | | time1, err := time.Parse(layout, timestampStr1) |
| | | if err != nil { |
| | | fmt.Println("时间解析失败:", err) |
| | | return false |
| | | } |
| | | time2, err := time.Parse(layout, timestampStr2) |
| | | if err != nil { |
| | | fmt.Println("时间解析失败:", err) |
| | | return false |
| | | } |
| | | |
| | | // 计算时间差 |
| | | diff := time2.Sub(time1) |
| | | |
| | | // 检查时间差是否小于等于指定的间隔 |
| | | if diff.Minutes() <= float64(intervalInMinutes) { |
| | | return true |
| | | } else { |
| | | return false |
| | | } |
| | | } |
| | | |
| | | ////格式化时间hh:mm:ss:zzz to hh:mm:ss |
| | | //func formatTime(inputTime string) (string, error) { |
| | | // parsedTime, err := time.Parse("15:04:05:000", inputTime) |
| | | // if err != nil { |
| | | // return "", err |
| | | // } |
| | | // |
| | | // formattedTime := parsedTime.Format("15:04:05") |
| | | // return formattedTime, nil |
| | | //} |
| | | func resetDataId(dataId []string, id, dDate, dTime string, sDate *string, sTime *string) []string { |
| | | dataId = make([]string, 0) |
| | | *sDate = dDate |
| | | *sTime = dTime |
| | | dataId = append(dataId, id) |
| | | return dataId |
| | | } |
| | | |
| | | func decodeActivityId(aHFormat activeHourFormat, frequency int, intervalInMinutes int, source []map[string]interface{}) ([]map[string]interface{}, error) { |
| | | docInfo := make([]map[string]interface{}, 0) |
| | | for _, info := range source { |
| | | tmpInfo := make(map[string]interface{}) |
| | | activeId := make([]string, 0) |
| | | sDate := "" |
| | | sTime := "" |
| | | documentNumber := info["key"].(string) |
| | | tmpInfo["documentNumber"] = documentNumber |
| | | //fmt.Println("documentNumber: ", documentNumber) |
| | | topHits := info["top_hits"].(map[string]interface{}) |
| | | hits := topHits["hits"].(map[string]interface{}) |
| | | hitsResult := hits["hits"].([]interface{}) |
| | | dataId := make([]string, 0) |
| | | picUrl := "" |
| | | if hitsResult[0].(map[string]interface{})["_source"].(map[string]interface{})["baseInfo"] != nil { |
| | | baseInfo := hitsResult[0].(map[string]interface{})["_source"].(map[string]interface{})["baseInfo"] |
| | | if v, ok := baseInfo.([]interface{}); ok { |
| | | picUrl = v[0].(map[string]interface{})["targetPicUrl"].(string) |
| | | } else if v, ok := baseInfo.(map[string]interface{}); ok { |
| | | picUrl = v["targetPicUrl"].(string) |
| | | } |
| | | } else { |
| | | if hitsResult[0].(map[string]interface{})["_source"].(map[string]interface{})["targetInfo"] != nil { |
| | | picUrl = hitsResult[0].(map[string]interface{})["_source"].(map[string]interface{})["targetInfo"].([]interface{})[0].(map[string]interface{})["picSmUrl"].(string) |
| | | } |
| | | } |
| | | |
| | | //if hitsResult[0].(map[string]interface{})["baseInfo"] != nil { |
| | | // fmt.Println("picUrl1: ", picUrl) |
| | | // picUrl = hitsResult[0].(map[string]interface{})["baseInfo"].([]interface{})[0].(map[string]interface{})["targetPicUrl"].(string) |
| | | //} else { |
| | | // if hitsResult[0].(map[string]interface{})["targetInfo"] != nil { |
| | | // fmt.Println("picUrl2: ", picUrl) |
| | | // picUrl = hitsResult[0].(map[string]interface{})["targetInfo"].([]interface{})[0].(map[string]interface{})["picSmUrl"].(string) |
| | | // } |
| | | //} |
| | | tmpInfo["picUrl"] = picUrl |
| | | for sIndex, sourceInfo := range hitsResult { |
| | | rSourceInfo := sourceInfo.(map[string]interface{}) |
| | | source := rSourceInfo["_source"].(map[string]interface{}) |
| | | captureTime := source["picDate"].(string) |
| | | dDate := strings.Split(captureTime, " ")[0] |
| | | dTime := strings.Split(captureTime[:19], " ")[1] |
| | | id := source["id"].(string) |
| | | //fmt.Println("sindex: ", sIndex, "documentNumber", tmpInfo["documentNumber"], "id: ", id, "captureTime: ", captureTime) |
| | | if !isTimeInRange(dTime, aHFormat.startTime, aHFormat.endTime) { |
| | | //if sDate != "" && len(dataId) >= frequency { |
| | | // activeId = append(activeId, dataId...) |
| | | // dataId = resetDataId(dataId, id, dDate, dTime, &sDate, &sTime) |
| | | //} |
| | | continue |
| | | } |
| | | if sDate == "" { |
| | | sDate = dDate |
| | | sTime = dTime |
| | | dataId = append(dataId, id) |
| | | if len(dataId) >= frequency { |
| | | activeId = append(activeId, dataId...) |
| | | dataId = resetDataId(dataId, id, dDate, dTime, &sDate, &sTime) |
| | | } |
| | | continue |
| | | } |
| | | if checkTimeDifference(sDate+" "+sTime, captureTime[:19], intervalInMinutes) { |
| | | if len(dataId) >= frequency { |
| | | activeId = append(activeId, dataId...) |
| | | dataId = resetDataId(dataId, id, dDate, dTime, &sDate, &sTime) |
| | | } |
| | | continue |
| | | } |
| | | //fmt.Println(daysBetweenDates(sDate, dDate)) |
| | | if aHFormat.startHour < aHFormat.endHour && daysBetweenDates(sDate, dDate) == 0 { |
| | | dataId = append(dataId, id) |
| | | } else if aHFormat.startHour > aHFormat.endHour { |
| | | if daysBetweenDates(sDate, dDate) == 0 { |
| | | if compareTimes(dTime, aHFormat.startTime) == compareTimes(sTime, aHFormat.startTime) { |
| | | // ||compareTimes(dTime,aHFormat.endTime) == compareTimes(sTime, aHFormat.endTime){ |
| | | dataId = append(dataId, id) |
| | | } |
| | | } else if daysBetweenDates(sDate, dDate) == 1 { |
| | | //初始时间戳在结束范围之前 |
| | | if compareTimes(sTime, aHFormat.endTime) == -1 { |
| | | if len(dataId) >= frequency { |
| | | activeId = append(activeId, dataId...) |
| | | dataId = resetDataId(dataId, id, dDate, dTime, &sDate, &sTime) |
| | | } |
| | | //初始时间戳在开始范围之后 |
| | | } else if compareTimes(sTime, aHFormat.endTime) == 1 { |
| | | //next时间戳在结束范围之前 |
| | | if compareTimes(dTime, aHFormat.endTime) == -1 { |
| | | dataId = append(dataId, id) |
| | | //next时间戳在开始范围之后 |
| | | } else if compareTimes(dTime, aHFormat.startTime) == 1 { |
| | | if len(dataId) >= frequency { |
| | | activeId = append(activeId, dataId...) |
| | | dataId = resetDataId(dataId, id, dDate, dTime, &sDate, &sTime) |
| | | } |
| | | } |
| | | } |
| | | } else if daysBetweenDates(sDate, dDate) >= 1 { |
| | | //fmt.Println(len(dataId)) |
| | | if len(dataId) >= frequency { |
| | | activeId = append(activeId, dataId...) |
| | | dataId = resetDataId(dataId, id, dDate, dTime, &sDate, &sTime) |
| | | } |
| | | } |
| | | } |
| | | if sIndex == len(hitsResult)-1 { |
| | | if len(dataId) >= frequency { |
| | | activeId = append(activeId, dataId...) |
| | | } |
| | | } |
| | | } |
| | | if len(activeId) > 0 { |
| | | tmpInfo["id"] = activeId |
| | | docInfo = append(docInfo, tmpInfo) |
| | | } |
| | | } |
| | | return docInfo, nil |
| | | } |
| | | |
| | | func DayNightActivityQuery(comIds []string, docNumber string, startTime string, endTime string, activeHour string, frequency int, |
| | | intervalInMinutes int, indexName string, serverIp string, serverPort string) (map[string]interface{}, error) { |
| | | esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | |
| | | aHFormat, err := formatActiveHour(activeHour) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | filterDocIdAttr := "" |
| | | if docNumber != "" { |
| | | filterDocIdAttr = "{\"term\": {\"documentNumber\": \"" + docNumber + "\"}}," |
| | | } |
| | | comIdsStr := "" |
| | | if comIds == nil || len(comIds) > 0 { |
| | | esComIds := strings.Replace(strings.Trim(fmt.Sprint(comIds), "[]"), " ", "\",\"", -1) |
| | | comIdsStr = "{\"terms\":{\"communityId\":[\"" + esComIds + "\"]}}," |
| | | } |
| | | queryDSL := ` |
| | | { |
| | | "size": 0, |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "range": { |
| | | "picDate": { |
| | | "gte": "` + startTime + `", |
| | | "lt": "` + endTime + `" |
| | | } |
| | | } |
| | | }, |
| | | ` + filterDocIdAttr + ` |
| | | ` + comIdsStr + ` |
| | | { |
| | | "script": { |
| | | "script": { |
| | | "source": "doc['picDate'].value.hourOfDay >= ` + strconv.Itoa(aHFormat.startHour) + ` || doc['picDate'].value.hourOfDay < ` + strconv.Itoa(aHFormat.endHour) + `", |
| | | "lang": "painless" |
| | | } |
| | | } |
| | | } |
| | | ], |
| | | "must_not": [ |
| | | { |
| | | "term": { |
| | | "documentNumber": "" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "aggs": { |
| | | "group_by_documentnumber": { |
| | | "terms": { |
| | | "field": "documentNumber", |
| | | "size": 100000 |
| | | }, |
| | | "aggs": { |
| | | "top_hits": { |
| | | "top_hits": { |
| | | "_source": [ |
| | | "id", |
| | | "picDate", |
| | | "baseInfo.targetPicUrl", |
| | | "targetInfo.picSmUrl" |
| | | ], |
| | | "size": 100000, |
| | | "sort": [ |
| | | { |
| | | "picDate": { |
| | | "order": "asc" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | }` |
| | | //fmt.Println(esURL) |
| | | //fmt.Println(queryDSL) |
| | | var result = make(map[string]interface{}) |
| | | buf, err := EsReq("POST", esURL, []byte(queryDSL)) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | source, err := SourceAggregationList(buf) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | //fmt.Println(source) |
| | | docResult, err := decodeActivityId(aHFormat, frequency, intervalInMinutes, source) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | //result, _ := decodeDocumentInfos(source) |
| | | //return result, nil |
| | | if len(docResult) == 0 { |
| | | return result, nil |
| | | } |
| | | DataInfos, err := GetInfosByIds(docResult[0]["id"].([]string), indexName, serverIp, serverPort) |
| | | result["documentNumbers"] = docResult |
| | | result["datalist"] = DataInfos |
| | | return result, nil |
| | | } |
| | | |
| | | type acmInfo struct { |
| | | documentNumber string |
| | | camerasInfos []camerasInfo |
| | | } |
| | | |
| | | type camerasInfo struct { |
| | | cameraId string |
| | | captureInfos []captureInfo |
| | | } |
| | | |
| | | type captureInfo struct { |
| | | id string |
| | | picDate string |
| | | } |
| | | |
| | | func addSecondsToTimestamp(timestamp string, seconds int) (string, error) { |
| | | parsedTime, err := time.Parse("2006-01-02 15:04:05", timestamp) |
| | | if err != nil { |
| | | return "", err |
| | | } |
| | | newTime := parsedTime.Add(time.Second * time.Duration(seconds)) |
| | | newTimestamp := newTime.Format("2006-01-02 15:04:05") |
| | | return newTimestamp, nil |
| | | } |
| | | |
| | | func decodeInfo(intervalInMinutes int, source []map[string]interface{}) ([]acmInfo, error) { |
| | | acmInfos := make([]acmInfo, 0) |
| | | for _, info := range source { |
| | | var aInfo acmInfo |
| | | documentNumber := info["key"].(string) |
| | | aInfo.documentNumber = documentNumber |
| | | groupByCameraId := info["group_by_cameraId"].(map[string]interface{}) |
| | | cameraBuckets := groupByCameraId["buckets"].([]interface{}) |
| | | for _, cameraInfo := range cameraBuckets { |
| | | var camsInfo camerasInfo |
| | | cInfo := cameraInfo.(map[string]interface{}) |
| | | cameraId := cInfo["key"].(string) |
| | | camsInfo.cameraId = cameraId |
| | | dataBuckets := cInfo["top_hits"].(map[string]interface{})["hits"].(map[string]interface{})["hits"].([]interface{}) |
| | | markTime := "" |
| | | for _, dataInfo := range dataBuckets { |
| | | var capInfo captureInfo |
| | | dInfo := dataInfo.(map[string]interface{}) |
| | | dSource := dInfo["_source"].(map[string]interface{}) |
| | | id := dSource["id"].(string) |
| | | picDate := dSource["picDate"].(string) |
| | | //addFlag := false |
| | | if markTime == "" { |
| | | //addFlag = true |
| | | markTime = picDate |
| | | } else { |
| | | if checkTimeDifference(markTime, picDate, intervalInMinutes) { |
| | | //fmt.Println(markTime, picDate) |
| | | markTime = picDate |
| | | continue |
| | | } |
| | | markTime = picDate |
| | | } |
| | | capInfo.id = id |
| | | capInfo.picDate = picDate |
| | | camsInfo.captureInfos = append(camsInfo.captureInfos, capInfo) |
| | | } |
| | | aInfo.camerasInfos = append(aInfo.camerasInfos, camsInfo) |
| | | } |
| | | acmInfos = append(acmInfos, aInfo) |
| | | } |
| | | return acmInfos, nil |
| | | } |
| | | |
| | | type addResultIds struct { |
| | | documentNumber string |
| | | unionIds []unionId |
| | | } |
| | | |
| | | type unionId struct { |
| | | baseId string |
| | | targetId string |
| | | } |
| | | |
| | | func addResultInfo(source []map[string]interface{}, targetAddResultIds *[]addResultIds, bId string) { |
| | | found := false |
| | | for _, info := range source { |
| | | documentNumber := info["key"].(string) |
| | | dataBuckets := info["top_hits"].(map[string]interface{})["hits"].(map[string]interface{})["hits"].([]interface{}) |
| | | id := dataBuckets[0].(map[string]interface{})["_source"].(map[string]interface{})["id"].(string) |
| | | //fmt.Println("documentNumber: ", documentNumber, "\tid: ", id) |
| | | for i, docInfo := range *targetAddResultIds { |
| | | if documentNumber == docInfo.documentNumber { |
| | | //fmt.Println("新更新") |
| | | (*targetAddResultIds)[i].unionIds = append((*targetAddResultIds)[i].unionIds, unionId{baseId: bId, targetId: id}) |
| | | found = true |
| | | break |
| | | } |
| | | } |
| | | if !found { |
| | | //fmt.Println("新添加") |
| | | var targetAddResultId addResultIds |
| | | targetAddResultId.documentNumber = documentNumber |
| | | targetAddResultId.unionIds = append(targetAddResultId.unionIds, unionId{baseId: bId, targetId: id}) |
| | | *targetAddResultIds = append(*targetAddResultIds, targetAddResultId) |
| | | } |
| | | |
| | | } |
| | | } |
| | | |
| | | func removeDuplicates(nums []string) []string { |
| | | result := make([]string, 0) |
| | | seen := make(map[string]bool) |
| | | |
| | | for _, num := range nums { |
| | | if !seen[num] { |
| | | result = append(result, num) |
| | | seen[num] = true |
| | | } |
| | | } |
| | | |
| | | return result |
| | | } |
| | | |
| | | func findAnalyzeCoordinatedMovementsInfos(infos []acmInfo, docNumber string, beforeTime int, afterTime int, frequency int, |
| | | indexName string, serverIp string, serverPort string) (map[string]interface{}, error) { |
| | | //baseAddResultIds := make([]addResultIds, 0) |
| | | targetAddResultIds := make([]addResultIds, 0) |
| | | esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | for _, info := range infos { |
| | | for _, cInfo := range info.camerasInfos { |
| | | for _, pInfo := range cInfo.captureInfos { |
| | | gteDate, err := addSecondsToTimestamp(pInfo.picDate[:19], beforeTime) |
| | | if err != nil { |
| | | fmt.Println(err) |
| | | } |
| | | lteDate, err := addSecondsToTimestamp(pInfo.picDate[:19], afterTime) |
| | | if err != nil { |
| | | fmt.Println(err) |
| | | } |
| | | queryDSL := ` |
| | | { |
| | | "size": 0, |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "range": { |
| | | "picDate": { |
| | | "gte": "` + gteDate + `", |
| | | "lte": "` + lteDate + `" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "term": { |
| | | "cameraId": "` + cInfo.cameraId + `" |
| | | } |
| | | } |
| | | ], |
| | | "must_not": [ |
| | | { |
| | | "term": { |
| | | "documentNumber": "` + docNumber + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "aggs": { |
| | | "group_by_documentnumber": { |
| | | "terms": { |
| | | "field": "documentNumber", |
| | | "size": 100000 |
| | | }, |
| | | "aggs": { |
| | | "top_hits": { |
| | | "top_hits": { |
| | | "_source": [ |
| | | "id", |
| | | "cameraId", |
| | | "picDate" |
| | | ], |
| | | "size": 10000, |
| | | "sort": [ |
| | | { |
| | | "picDate": { |
| | | "order": "asc" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | }` |
| | | //fmt.Println(esURL, queryDSL) |
| | | buf, err := EsReq("POST", esURL, []byte(queryDSL)) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | source, err := SourceAggregationList(buf) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | //fmt.Println("pInfo.id: ", pInfo.id) |
| | | addResultInfo(source, &targetAddResultIds, pInfo.id) |
| | | //fmt.Println("targetAddResultIds: ", targetAddResultIds) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | //fmt.Println(source) |
| | | } |
| | | } |
| | | } |
| | | //fmt.Println("targetAddResultIds: ", targetAddResultIds) |
| | | baseIds := make([]string, 0) |
| | | targetIds := make([]string, 0) |
| | | for _, tAIdInfo := range targetAddResultIds { |
| | | if len(tAIdInfo.unionIds) >= frequency { |
| | | for _, unionId := range tAIdInfo.unionIds { |
| | | baseIds = append(baseIds, unionId.baseId) |
| | | targetIds = append(targetIds, unionId.targetId) |
| | | } |
| | | } |
| | | } |
| | | |
| | | rdbaseIds := removeDuplicates(baseIds) |
| | | rdtargetIds := removeDuplicates(targetIds) |
| | | baseInfos, err := GetInfosByIds(rdbaseIds, indexName, serverIp, serverPort) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | targetInfos, err := GetInfosByIds(rdtargetIds, indexName, serverIp, serverPort) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | docNumberMap := make(map[string][]interface{}) |
| | | for _, tinfo := range targetInfos { |
| | | docNumber := tinfo["documentNumber"].(string) |
| | | docNumberMap[docNumber] = append(docNumberMap[docNumber], tinfo) |
| | | } |
| | | targetRecordInfos := make([]map[string]interface{}, 0) |
| | | for docNumber, infos := range docNumberMap { |
| | | ifs := make(map[string]interface{}) |
| | | ifs["documentNumber"] = docNumber |
| | | ifs["recordInfos"] = infos |
| | | targetRecordInfos = append(targetRecordInfos, ifs) |
| | | } |
| | | result := make(map[string]interface{}) |
| | | result["baseRecordInfo"] = baseInfos |
| | | result["targetRecordInfo"] = targetRecordInfos |
| | | return result, nil |
| | | } |
| | | |
| | | func AnalyzeCoordinatedMovements(comIds []string, docNumber string, startDate string, endDate string, beforeTime int, afterTime int, frequency int, |
| | | intervalInMinutes int, indexName string, serverIp string, serverPort string) (map[string]interface{}, error) { |
| | | esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | //判断社区IDs |
| | | comIdsStr := "" |
| | | if comIds == nil || len(comIds) > 0 { |
| | | esComIds := strings.Replace(strings.Trim(fmt.Sprint(comIds), "[]"), " ", "\",\"", -1) |
| | | comIdsStr = "{\"terms\":{\"communityId\":[\"" + esComIds + "\"]}}," |
| | | } |
| | | queryDSL := ` |
| | | { |
| | | "size": 0, |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "range": { |
| | | "picDate": { |
| | | "gte": "` + startDate + `", |
| | | "lte": "` + endDate + `" |
| | | } |
| | | } |
| | | }, |
| | | ` + comIdsStr + ` |
| | | { |
| | | "term": { |
| | | "documentNumber": "` + docNumber + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "aggs": { |
| | | "group_by_documentnumber": { |
| | | "terms": { |
| | | "field": "documentNumber", |
| | | "size": 100000 |
| | | }, |
| | | "aggs": { |
| | | "group_by_cameraId": { |
| | | "terms": { |
| | | "field": "cameraId", |
| | | "size": 10000 |
| | | }, |
| | | "aggs": { |
| | | "top_hits": { |
| | | "top_hits": { |
| | | "_source": [ |
| | | "id", |
| | | "cameraId", |
| | | "picDate" |
| | | ], |
| | | "size": 10000, |
| | | "sort": [ |
| | | { |
| | | "picDate": { |
| | | "order": "asc" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | }` |
| | | //fmt.Println(esURL) |
| | | //fmt.Println(queryDSL) |
| | | buf, err := EsReq("POST", esURL, []byte(queryDSL)) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | source, err := SourceAggregationList(buf) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | docResult, err := decodeInfo(intervalInMinutes, source) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | //fmt.Println(docResult) |
| | | result, err := findAnalyzeCoordinatedMovementsInfos(docResult, docNumber, beforeTime, afterTime, frequency, indexName, serverIp, serverPort) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | //fmt.Println(result) |
| | | return result, nil |
| | | } |
| | | |
| | | func GetInfosByIds(ids []string, indexName string, serverIp string, serverPort string) ([]map[string]interface{}, error) { |
| | | captureIds := strings.Replace(strings.Trim(fmt.Sprint(ids), "[]"), " ", "\",\"", -1) |
| | | esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | queryDSL := ` |
| | | { |
| | | "query": { |
| | | "bool": { |
| | | "filter": [{ |
| | | "terms": { |
| | | "id": [ |
| | | "` + captureIds + `" |
| | | ] |
| | | } |
| | | }] |
| | | } |
| | | }, |
| | | "size":1000000, |
| | | "sort":[{"picDate":{"order":"desc"}}], |
| | | "_source": {"includes":[],"excludes":["*.feature"]} |
| | | } |
| | | ` |
| | | buf, err := EsReq("POST", esURL, []byte(queryDSL)) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | sources, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | |
| | | return sources, nil |
| | | } |
| | | |
| | | // ***********************重庆End************************************// |
| | | // 根据抓拍人员id查询抓拍人员信息 |
| | | func AIOceaninfosbyid(id []string, indexName string, serverIp string, serverPort string) ([]protomsg.AIOcean, error) { |
| | | var aIOceanInfo []protomsg.AIOcean |
| | |
| | | return aIOcean, nil |
| | | } |
| | | |
| | | //根据抓拍库人员id查询特征值 |
| | | // 根据抓拍人员id查询视频地址 |
| | | func AIOceanVideoUrlbyid(id string, indexName string, serverIp string, serverPort string) (string, error) { |
| | | //var aIOceanInfo []protomsg.AIOcean |
| | | //videopersonsPersonId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1) |
| | | var dbinfoRequest = ` |
| | | { |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "term": { |
| | | "id": "` + id + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "_source": [ |
| | | "videoUrl" |
| | | ] |
| | | } |
| | | ` |
| | | buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search", []byte(dbinfoRequest)) |
| | | if err != nil { |
| | | return "", err |
| | | } |
| | | |
| | | sources, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return "", err |
| | | } |
| | | videoUrl := sources[0]["videoUrl"].(string) |
| | | //aIOcean := AIOceanAnalysis(sources) |
| | | return videoUrl, nil |
| | | } |
| | | |
| | | // 根据抓拍库人员id查询特征值 |
| | | func GetVideoPersonFaceFeatureById(id string, indexName string, serverIp string, serverPort string) (string, error) { |
| | | var jsonDSL = ` |
| | | { |
| | |
| | | return feature, nil |
| | | } |
| | | |
| | | //根据目标id查询已追加条数 |
| | | // 根据目标id查询已追加条数 |
| | | func GetLinkTagInfoSize(id string, indexName string, serverIp string, serverPort string) (size int, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | queryDSL := `{ |
| | |
| | | return size, nil |
| | | } |
| | | |
| | | //根据目标id追加跟踪信息 |
| | | // 根据目标id追加跟踪信息 |
| | | func AppendTargetInfo(id string, targetInfo string, indexName string, serverIp string, serverPort string, updateTime string) (string, error) { |
| | | if targetInfo == "" { |
| | | return "", errors.New("append data is nil") |
| | |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | if len(source) == 0{ |
| | | return source,nil |
| | | if len(source) == 0 { |
| | | return source, nil |
| | | } |
| | | faceSource := make([]map[string]interface{}, 0) |
| | | for index, info := range source { |
| | |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | if len(source) == 0{ |
| | | return source,nil |
| | | if len(source) == 0 { |
| | | return source, nil |
| | | } |
| | | faceSource := make([]map[string]interface{}, 0) |
| | | for index, info := range source { |
| | |
| | | return ids, nil |
| | | } |
| | | |
| | | //统计各个区域人数 |
| | | // 统计各个区域人数 |
| | | func StatisticsEveryAreaPersonsNumber(startTime string, endTime string, serverIp string, serverPort string, indexName string) ([]map[string]interface{}, error) { |
| | | var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | var requestBody = `{ |
| | |
| | | |
| | | } |
| | | |
| | | //根据时间范围,摄像机列表,分组聚合人脸列表,返回分组数据 |
| | | // 根据时间范围,摄像机列表,分组聚合人脸列表,返回分组数据 |
| | | func GetFaceDataBucketsByCameraIdAndTimeReturnByGrouped(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) { |
| | | var filterArr []string |
| | | if cameraId != nil && len(cameraId) > 0 { |
| | |
| | | return sources, nil |
| | | } |
| | | |
| | | //根据时间范围,摄像机列表,分组聚合人脸列表 |
| | | // 根据时间范围,摄像机列表,分组聚合人脸列表 |
| | | func GetFaceDataBucketsByCameraIdAndTime(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) { |
| | | var filterArr []string |
| | | if cameraId != nil && len(cameraId) > 0 { |
| | |
| | | return sources, nil |
| | | } |
| | | |
| | | //根据抓拍人员id更新(picurl)图片地址 |
| | | // 根据抓拍人员id更新(picurl)图片地址 |
| | | func UpdatePicUrlById(id string, picUrl string, indexName string, serverIp string, serverPort string) (err error) { |
| | | updateTime := time.Now().Format("2006-01-02 15:04:05") |
| | | tRes, err := AIOceaninfosbyid([]string{id}, indexName, serverIp, serverPort) |
| | |
| | | return nil |
| | | } |
| | | |
| | | //根据抓拍人员id更新(videourl)摄像机地址 |
| | | // 根据抓拍人员id更新(videourl)摄像机地址 |
| | | func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int, err error) { |
| | | |
| | | var info interface{} |
| | |
| | | return statu, errors.New("http response interface can not change map[string]interface{}") |
| | | } |
| | | middle, ok := out["updated"].(float64) |
| | | if !ok { |
| | | batches, ok1 := out["batches"].(float64) |
| | | if !ok || !ok1 { |
| | | logPrint("first updated change error!") |
| | | statu = 500 |
| | | return statu, errors.New("first updated change error!") |
| | | } |
| | | if middle == 1 { |
| | | statu = 200 |
| | | return statu, nil |
| | | } |
| | | if middle == 0 { |
| | | statu = 201 |
| | | return statu, errors.New("已经修改") |
| | | if batches == 0 { |
| | | logPrint("no such doc in database") |
| | | statu = 400 |
| | | return statu, errors.New("目标数据不存在") |
| | | } else { |
| | | if middle == 1 { |
| | | statu = 200 |
| | | return statu, nil |
| | | } |
| | | if middle == 0 { |
| | | statu = 201 |
| | | return statu, errors.New("已经修改") |
| | | } |
| | | } |
| | | return statu, nil |
| | | } |
| | | |
| | | //获取当前节点抓拍库所有人员ID*缓存* |
| | | // 获取当前节点抓拍库所有人员ID*缓存* |
| | | func GetAllLocalVideopersonsId(compareArgs protomsg.CompareArgs, indexName string, serverIp string, serverPort string, alarmLevelTypes string) (capturetable []string) { |
| | | queryStr := "" |
| | | queryBody := compareArgs.InputValue |
| | |
| | | isCollectStr := "" |
| | | isCollect := compareArgs.Collection |
| | | if isCollect != "" { |
| | | isCollectStr = "{\"term\":{\"isCollect\":\"" + isCollect + "\"}}," |
| | | //isCollectStr = "{\"term\":{\"isCollect\":\"" + isCollect + "\"}}," |
| | | if isCollect == "1" { |
| | | isCollectStr = "{\"term\":{\"isCollect\":true}}," |
| | | } else if isCollect == "0" { |
| | | isCollectStr = "{\"term\":{\"isCollect\":false}}," |
| | | } |
| | | } |
| | | |
| | | //判断布防等级 |
| | |
| | | return capturetable |
| | | } |
| | | |
| | | //初始化实时抓拍 |
| | | // 初始化实时抓拍 |
| | | func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm string, category string, quantity int) ([]protomsg.AIOcean, error) { |
| | | var aIOceanInfo []protomsg.AIOcean |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | |
| | | return aIOcean, nil |
| | | } |
| | | |
| | | //实时抓拍 |
| | | // 实时抓拍 |
| | | func RealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool) ([]protomsg.AIOcean, error) { |
| | | var aIOceanInfo []protomsg.AIOcean |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | |
| | | return aIOcean, nil |
| | | } |
| | | |
| | | //综合统计 |
| | | // 综合统计 |
| | | func StatisticsComprehensive(serverIp string, serverPort string, indexName string, isAlarm string) (total int, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | | "/" + indexName + "/_search" |
| | |
| | | return total, nil |
| | | } |
| | | |
| | | //实时报警任务比率 |
| | | // 实时报警任务比率 |
| | | func RealTimeAlarmTaskRate(serverIp string, serverPort string, indexName string) (sources []map[string]interface{}, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | | "/" + indexName + "/_search" |
| | |
| | | return sources, nil |
| | | } |
| | | |
| | | //聚合任务列表,taskId+taskName |
| | | func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string) (sources []map[string]interface{}, err error) { |
| | | // 聚合任务列表,taskId+taskName |
| | | func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string, cameraIds []string) (sources []map[string]interface{}, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | | "/" + indexName + "/_search" |
| | | serverFilterStr := "" |
| | | cameIdFilterStr := "" |
| | | if cameraIds != nil && len(cameraIds) > 0 { |
| | | cameIdsStr := strings.Replace(strings.Trim(fmt.Sprint(cameraIds), "[]"), " ", "\",\"", -1) |
| | | cameIdFilterStr = `,{ |
| | | "term": { |
| | | "cameraId": "` + cameIdsStr + `" |
| | | } |
| | | }` |
| | | } |
| | | if analyServerId != "" { |
| | | serverFilterStr = `, |
| | | "query": { |
| | |
| | | { |
| | | "term": { |
| | | "analyServerId": "` + analyServerId + `" |
| | | } |
| | | } |
| | | } |
| | | ` + cameIdFilterStr + ` |
| | | ] |
| | | } |
| | | }` |
| | |
| | | |
| | | } |
| | | |
| | | //添加即将删除信号 |
| | | // 添加即将删除信号 |
| | | func AddDeleteSignal() { |
| | | |
| | | } |
| | |
| | | |
| | | } |
| | | |
| | | //查询时间段数据 *缓存* |
| | | // 查询时间段数据 *缓存* |
| | | func GetPeriodInfos(serverIp string, serverPort string, startTime string, endTime string, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) { |
| | | var capdbinfo []*protomsg.MultiFeaCache |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local" |
| | |
| | | return dbinfos, nil |
| | | } |
| | | |
| | | //************************CORN TASK******************************* |
| | | //查询日期范围内是否还存在数据 |
| | | // ************************CORN TASK******************************* |
| | | // 查询日期范围内是否还存在数据 |
| | | func QueryAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | deleteJson := `{ |
| | |
| | | return result, nil |
| | | } |
| | | |
| | | //按日期范围,服务器Id删除数据 |
| | | func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) { |
| | | // 按日期范围,服务器Id删除数据 |
| | | func DeleteByDocumentNumber(docNumber []string, serverIp string, serverPort string, indexName string) (total int, err error) { |
| | | |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_delete_by_query" |
| | | docNumbers := strings.Replace(strings.Trim(fmt.Sprint(docNumber), "[]"), " ", "\",\"", -1) |
| | | deleteJson := `{ |
| | | "query":{ |
| | | "bool":{ |
| | | "filter":[ |
| | | { |
| | | "terms":{ |
| | | "documentNumber":["` + docNumbers + `"] |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | } |
| | | } ` |
| | | //fmt.Println(url) |
| | | //fmt.Println(deleteJson) |
| | | //return |
| | | buf, err := EsReq("POST", url, []byte(deleteJson)) |
| | | if err != nil { |
| | | return -1, errors.New("请求失败") |
| | | } |
| | | deleteRes, err := SourceDeleted(buf) |
| | | if err != nil { |
| | | return -1, errors.New("解码失败") |
| | | } |
| | | return deleteRes, nil |
| | | } |
| | | |
| | | //func GetCaptureDaysByDocumentNumber(docNumber string, comId string, indexName string, serverIp string, serverPort string){ |
| | | // url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | // queryDSL := `` |
| | | //} |
| | | |
| | | // 按日期范围,服务器Id删除数据 |
| | | func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (total int, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_delete_by_query" |
| | | deleteJson := `{ |
| | | "query":{ |
| | |
| | | } |
| | | } |
| | | } ` |
| | | fmt.Println(url) |
| | | fmt.Println(deleteJson) |
| | | buf, err := EsReq("POST", url, []byte(deleteJson)) |
| | | if err != nil { |
| | | return false, errors.New("请求失败") |
| | | return -1, errors.New("请求失败") |
| | | } |
| | | deleteRes, err := SourceDeleted(buf) |
| | | if err != nil { |
| | | return false, errors.New("解码失败") |
| | | return -1, errors.New("解码失败") |
| | | } |
| | | if deleteRes == -1 { |
| | | result = false |
| | | } else { |
| | | result = true |
| | | } |
| | | return result, nil |
| | | return deleteRes, nil |
| | | } |
| | | |
| | | //给所有节点追加删除任务信息 |
| | | // 给所有节点追加删除任务信息 |
| | | func AddDelTask(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query" |
| | | addJson := `{ |
| | |
| | | return result, nil |
| | | } |
| | | |
| | | //移除已执行完的删除任务 |
| | | // 移除已执行完的删除任务 |
| | | func DeleteDelTask(serverIp string, serverPort string, indexName string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query" |
| | | deleteJson := `{ |
| | |
| | | } |
| | | return result, nil |
| | | } |
| | | |
| | | type ShardInfo struct { |
| | | ShardIndex string `json:"shardIndex"` //分片所属索引名称 |
| | | ShardNum int `json:"shardNum"` //分片号 |
| | | ShardRole string `json:"shardRole"` //分片角色(主分片:primary 副本分片:replica) |
| | | ShardState string `json:"shardState"` //分片状态(启用:STARTED 未启用:UNASSIGNED) |
| | | ShardDocs int `json:"shardDocs"` //分片已保存文档数 |
| | | ShardStore string `json:"shardStore"` //分片当前存储数据大小 |
| | | ShardIp string `json:"shardIp"` //分片所在节点ip |
| | | ShardNode string `json:"shardNode"` //分片所在节点名称 |
| | | } |
| | | |
| | | // 获取索引分片信息 |
| | | func GetShardsByIndex(serverIp string, serverPort string, indexName string) ([]ShardInfo, error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/_cat/shards?v" |
| | | buf, err := EsReq("GET", url, []byte("")) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | var inf = []ShardInfo{} |
| | | res := strings.Split(string(buf), "\n")[1:] |
| | | for _, r := range res { |
| | | if r != "" { |
| | | |
| | | inx := strings.Fields(r) |
| | | index := inx[0] |
| | | shard, _ := strconv.Atoi(inx[1]) |
| | | prired := inx[2] |
| | | if prired == "r" { |
| | | prired = "replica" |
| | | } |
| | | if prired == "p" { |
| | | prired = "primary" |
| | | } |
| | | state := inx[3] |
| | | docs := 0 |
| | | store := "" |
| | | ip := "" |
| | | node := "" |
| | | if state == "STARTED" { |
| | | docs, _ = strconv.Atoi(inx[4]) |
| | | store = inx[5] |
| | | ip = inx[6] |
| | | node = inx[7] |
| | | } |
| | | if index == indexName { |
| | | inf = append(inf, ShardInfo{ |
| | | ShardIndex: index, |
| | | ShardNum: shard, |
| | | ShardRole: prired, |
| | | ShardState: state, |
| | | ShardDocs: docs, |
| | | ShardStore: store, |
| | | ShardIp: ip, |
| | | ShardNode: node, |
| | | }) |
| | | |
| | | } |
| | | } |
| | | |
| | | } |
| | | return inf, nil |
| | | } |