From ebfa610f8c66fd2827a2eec619bfb3e0e22c332f Mon Sep 17 00:00:00 2001
From: sunty <1172534965@qq.com>
Date: 星期一, 25 三月 2024 17:13:14 +0800
Subject: [PATCH] 昼伏夜出调试
---
EsApi.go | 1183 +++++++++++++++++++++++++++++++++++++++++++++++++++++++---
1 files changed, 1,104 insertions(+), 79 deletions(-)
diff --git a/EsApi.go b/EsApi.go
index 8883fcc..eb96adc 100644
--- a/EsApi.go
+++ b/EsApi.go
@@ -1,15 +1,15 @@
package esutil
import (
+ "basic.com/pubsub/protomsg.git"
"encoding/json"
"errors"
"fmt"
+ "sort"
"strconv"
"strings"
"sync"
"time"
-
- "basic.com/pubsub/protomsg.git"
)
var logPrint = func(i ...interface{}) {
@@ -22,6 +22,141 @@
}
}
+//***********************閲嶅簡Start**********************************//
+
+type activeHourFormat struct {
+ startTime string
+ endTime string
+ startHour int
+ endHour int
+}
+
+func formatActiveHour(activeHour string) (activeHourFormat, error) {
+ hours := strings.Split(activeHour, "-")
+
+ if len(hours) == 2 {
+ startHour := hours[0]
+ endHour := hours[1]
+
+ // 瑙f瀽寮�濮嬫椂闂寸殑灏忔椂鍜屽垎閽�
+ startParts := strings.Split(startHour, ":")
+ startHourInt, _ := strconv.Atoi(startParts[0])
+
+ // 瑙f瀽缁撴潫鏃堕棿鐨勫皬鏃跺拰鍒嗛挓
+ endParts := strings.Split(endHour, ":")
+ endHourInt, _ := strconv.Atoi(endParts[0])
+
+ // 杈撳嚭寮�濮嬫椂闂寸殑灏忔椂
+ fmt.Println("寮�濮嬫椂闂寸殑灏忔椂:", startHourInt)
+
+ // 杈撳嚭缁撴潫鏃堕棿鐨勫皬鏃� + 1
+ endHourPlusOne := (endHourInt + 1) % 24 // 鍙栦綑纭繚涓嶈秴杩�24灏忔椂
+ fmt.Println("缁撴潫鏃堕棿鐨勫皬鏃� + 1:", endHourPlusOne)
+ activeHourFormat := activeHourFormat{startTime: startHour, endTime: endHour, startHour: startHourInt, endHour: endHourPlusOne}
+ return activeHourFormat, nil
+ }
+ return activeHourFormat{}, errors.New("閿欒锛氭棤娉曡В鏋愬紑濮嬫椂闂村拰缁撴潫鏃堕棿")
+
+}
+
+func DayNightActivityQuery(communityId string, startTime string, endTime string, activeHour string, indexName string, serverIp string, serverPort string) ([]string, error) {
+ activityId := make([]string, 0)
+ esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
+
+ activeHourFormat, err := formatActiveHour(activeHour)
+ if err != nil {
+ return nil, err
+ }
+
+ queryDSL := `
+ {
+ "size": 0,
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "range": {
+ "picDate": {
+ "gte": "` + startTime + `",
+ "lt": "` + endTime + `"
+ }
+ }
+ },
+ {
+ "term": {
+ "communityId": "` + communityId + `"
+ }
+ },
+ {
+ "script": {
+ "script": {
+ "source": "doc['picDate'].value.hourOfDay >= ` + strconv.Itoa(activeHourFormat.startHour) + ` || doc['picDate'].value.hourOfDay < ` + strconv.Itoa(activeHourFormat.endHour) + `",
+ "lang": "painless"
+ }
+ }
+ }
+ ],
+ "must_not": [
+ {
+ "term": {
+ "documentNumber": ""
+ }
+ }
+ ]
+ }
+ },
+ "aggs": {
+ "group_by_documentnumber": {
+ "terms": {
+ "field": "documentNumber",
+ "size": 100000
+ },
+ "aggs": {
+ "group_by_date": {
+ "date_histogram": {
+ "field": "picDate",
+ "interval": "1d", // 鎸夊ぉ鍒嗘《
+ "format": "yyyy-MM-dd"
+ },
+ "aggs": {
+ "top_hits": {
+ "top_hits": {
+ "_source": [
+ "picDate"
+ ],
+ "size": 100000,
+ "sort": [
+ {
+ "picDate": {
+ "order": "desc"
+ }
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }`
+ //fmt.Println(esURL)
+ //fmt.Println(queryDSL)
+ buf, err := EsReq("POST", esURL, []byte(queryDSL))
+ if err != nil {
+ return nil, err
+ }
+ source, err := SourceAggregationList(buf)
+ if err != nil {
+ return nil, err
+ }
+ result, _ := decodeDocumentInfos(source)
+ return result, nil
+
+ return activityId, nil
+}
+
+// ***********************閲嶅簡End************************************//
// 鏍规嵁鎶撴媿浜哄憳id鏌ヨ鎶撴媿浜哄憳淇℃伅
func AIOceaninfosbyid(id []string, indexName string, serverIp string, serverPort string) ([]protomsg.AIOcean, error) {
var aIOceanInfo []protomsg.AIOcean
@@ -56,7 +191,43 @@
return aIOcean, nil
}
-//鏍规嵁鎶撴媿搴撲汉鍛榠d鏌ヨ鐗瑰緛鍊�
+// 鏍规嵁鎶撴媿浜哄憳id鏌ヨ瑙嗛鍦板潃
+func AIOceanVideoUrlbyid(id string, indexName string, serverIp string, serverPort string) (string, error) {
+ //var aIOceanInfo []protomsg.AIOcean
+ //videopersonsPersonId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1)
+ var dbinfoRequest = `
+ {
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "term": {
+ "id": "` + id + `"
+ }
+ }
+ ]
+ }
+ },
+ "_source": [
+ "videoUrl"
+ ]
+ }
+ `
+ buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search", []byte(dbinfoRequest))
+ if err != nil {
+ return "", err
+ }
+
+ sources, err := Sourcelist(buf)
+ if err != nil {
+ return "", err
+ }
+ videoUrl := sources[0]["videoUrl"].(string)
+ //aIOcean := AIOceanAnalysis(sources)
+ return videoUrl, nil
+}
+
+// 鏍规嵁鎶撴媿搴撲汉鍛榠d鏌ヨ鐗瑰緛鍊�
func GetVideoPersonFaceFeatureById(id string, indexName string, serverIp string, serverPort string) (string, error) {
var jsonDSL = `
{
@@ -86,7 +257,7 @@
return feature, nil
}
-//鏍规嵁鐩爣id鏌ヨ宸茶拷鍔犳潯鏁�
+// 鏍规嵁鐩爣id鏌ヨ宸茶拷鍔犳潯鏁�
func GetLinkTagInfoSize(id string, indexName string, serverIp string, serverPort string) (size int, err error) {
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
queryDSL := `{
@@ -112,7 +283,7 @@
return size, nil
}
-//鏍规嵁鐩爣id杩藉姞璺熻釜淇℃伅
+// 鏍规嵁鐩爣id杩藉姞璺熻釜淇℃伅
func AppendTargetInfo(id string, targetInfo string, indexName string, serverIp string, serverPort string, updateTime string) (string, error) {
if targetInfo == "" {
return "", errors.New("append data is nil")
@@ -159,7 +330,699 @@
}
-//鏍规嵁鎶撴媿浜哄憳id鏇存柊锛坴ideourl锛夋憚鍍忔満鍦板潃
+/**************************************customer analysis util start**************************************/
+/*******************sort []map util*******************/
+type MapsSort struct {
+ Key string
+ MapList []map[string]interface{}
+}
+
+func (m *MapsSort) Len() int {
+ return len(m.MapList)
+}
+
+func (m *MapsSort) Less(i, j int) bool {
+ return m.MapList[i][m.Key].(string) > m.MapList[j][m.Key].(string)
+}
+
+func (m *MapsSort) Swap(i, j int) {
+ m.MapList[i], m.MapList[j] = m.MapList[j], m.MapList[i]
+}
+
+/*******************sort []map util*******************/
+//鏍规嵁鏃堕棿鑼冨洿鑱氬悎鎵�鏈夊尯鍩熶汉淇℃伅锛岃繑鍥炲浐瀹氭潯鏁�
+func GetFaceDataByTimeAndTotal(startTime string, endTime string, total int, thresholdTime int, thresholdStayTime int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) {
+ var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
+ var requestBody = `{
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "range": {
+ "picDate": {
+ "gte": "` + startTime + `",
+ "lte": "` + endTime + `"
+ }
+ }
+ },
+ {
+ "term":{
+ "targetInfo.targetType.raw": "FaceDetect"
+ }
+ }
+ ]
+ }
+ },
+ "size": 0,
+ "aggs": {
+ "buckets_aggs": {
+ "composite": {
+ "sources": [
+ {
+ "faceId": {
+ "terms": {
+ "field": "baseInfo.targetId"
+ }
+ }
+ },
+ {
+ "areaId": {
+ "terms": {
+ "field": "targetInfo.areaId"
+ }
+ }
+ }
+ ],
+ "size": 10000000
+ },
+ "aggs": {
+ "top_attention_hits": {
+ "top_hits": {
+ "size": 1000000,
+ "sort": [
+ {
+ "picDate": {
+ "order": "asc"
+ }
+ }
+ ],
+ "_source": {
+ "includes": [
+ "baseInfo.targetId",
+ "targetInfo.picSmUrl",
+ "targetInfo.areaId",
+ "picDate"
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+}`
+ buf, err := EsReq("POST", requestUrl, []byte(requestBody))
+ if err != nil {
+ return nil, err
+ }
+ source, err := FaceSourceAggregations(buf, thresholdTime, thresholdStayTime)
+ if err != nil {
+ return nil, err
+ }
+ if len(source) == 0 {
+ return source, nil
+ }
+ faceSource := make([]map[string]interface{}, 0)
+ for index, info := range source {
+ if int(info["stayTime"].(float64)) > thresholdStayTime {
+ faceSource = append(faceSource, source[index])
+ }
+ }
+ mapsSort := MapsSort{}
+ mapsSort.Key = "endTime"
+ mapsSort.MapList = faceSource
+ sort.Sort(&mapsSort)
+ if len(faceSource) > total {
+ return mapsSort.MapList[:total], nil
+ }
+ return mapsSort.MapList, nil
+}
+
+func GetFaceDataByTimeAndId(startTime string, endTime string, id string, thresholdTime int, thresholdStayTime int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) {
+ var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
+ var requestBody = `{
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "range": {
+ "picDate": {
+ "gte": "` + startTime + `",
+ "lte": "` + endTime + `"
+ }
+ }
+ },
+ {
+ "term":{
+ "targetInfo.targetType.raw": "FaceDetect"
+ }
+ },
+ {
+ "term":{
+ "baseInfo.targetId": "` + id + `"
+ }
+ }
+ ]
+ }
+ },
+ "size": 0,
+ "aggs": {
+ "buckets_aggs": {
+ "composite": {
+ "sources": [
+ {
+ "faceId": {
+ "terms": {
+ "field": "baseInfo.targetId"
+ }
+ }
+ },
+ {
+ "areaId": {
+ "terms": {
+ "field": "targetInfo.areaId"
+ }
+ }
+ }
+ ],
+ "size": 10000000
+ },
+ "aggs": {
+ "top_attention_hits": {
+ "top_hits": {
+ "size": 1000000,
+ "sort": [
+ {
+ "picDate": {
+ "order": "asc"
+ }
+ }
+ ],
+ "_source": {
+ "includes": [
+ "baseInfo.targetId",
+ "targetInfo.picSmUrl",
+ "targetInfo.areaId",
+ "picDate"
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+}`
+ buf, err := EsReq("POST", requestUrl, []byte(requestBody))
+ if err != nil {
+ return nil, err
+ }
+ source, err := FaceSourceAggregations(buf, thresholdTime, thresholdStayTime)
+ if err != nil {
+ return nil, err
+ }
+ if len(source) == 0 {
+ return source, nil
+ }
+ faceSource := make([]map[string]interface{}, 0)
+ for index, info := range source {
+ if int(info["stayTime"].(float64)) > thresholdStayTime {
+ faceSource = append(faceSource, source[index])
+ }
+ }
+ mapsSort := MapsSort{}
+ mapsSort.Key = "startTime"
+ mapsSort.MapList = faceSource
+ sort.Sort(&mapsSort)
+ return mapsSort.MapList, nil
+}
+
+func GetFaceIdDeduplication(startTime string, endTime string, serverIp string, serverPort string, indexName string) (ids []map[string]interface{}, err error) {
+ var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
+ var requestBody = `{
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "range": {
+ "picDate": {
+ "gte": "` + startTime + `",
+ "lte": "` + endTime + `"
+ }
+ }
+ },
+ {
+ "term": {
+ "targetInfo.targetType.raw": "FaceDetect"
+ }
+ }
+ ]
+ }
+ },
+ "size": 0,
+ "aggs": {
+ "buckets_aggs": {
+ "composite": {
+ "sources": [
+ {
+ "faceId": {
+ "terms": {
+ "field": "baseInfo.targetId"
+ }
+ }
+ }
+ ],
+ "size": 10000000
+ },
+ "aggs": {
+ "top_attention_hits": {
+ "top_hits": {
+ "size": 1,
+ "sort": [
+ {
+ "picDate": {
+ "order": "desc"
+ }
+ }
+ ],
+ "_source": {
+ "includes": [
+ "picDate"
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}`
+ //fmt.Println(requestUrl)
+ //fmt.Println(requestBody)
+ buf, err := EsReq("POST", requestUrl, []byte(requestBody))
+ if err != nil {
+ return nil, err
+ }
+ ids, err1 := SourceDeduplication(buf)
+ if err1 != nil {
+ return nil, err1
+ }
+ if len(ids) > 1 {
+ mapsSort := MapsSort{}
+ mapsSort.Key = "lastTime"
+ mapsSort.MapList = ids
+ sort.Sort(&mapsSort)
+ return mapsSort.MapList, nil
+ }
+ return ids, nil
+}
+
+// 缁熻鍚勪釜鍖哄煙浜烘暟
+func StatisticsEveryAreaPersonsNumber(startTime string, endTime string, serverIp string, serverPort string, indexName string) ([]map[string]interface{}, error) {
+ var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
+ var requestBody = `{
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "range": {
+ "picDate": {
+ "gte": "` + startTime + `",
+ "lte": "` + endTime + `"
+ }
+ }
+ },
+ {
+ "term": {
+ "targetInfo.targetType.raw": "Yolo"
+ }
+ }
+ ]
+ }
+ },
+ "size": 0,
+ "aggs": {
+ "buckets_aggs": {
+ "composite": {
+ "sources": [
+ {
+ "areaId": {
+ "terms": {
+ "field": "targetInfo.areaId"
+ }
+ }
+ }
+ ],
+ "size": 10000000
+ }
+ }
+ }
+}`
+ buf, err := EsReq("POST", requestUrl, []byte(requestBody))
+ if err != nil {
+ return nil, err
+ }
+ result, err := SourceStatistics(buf)
+ if err != nil {
+ return nil, err
+ }
+ return result, nil
+}
+
+/**************************************customer analysis util end**************************************/
+//鏍规嵁鎽勫儚鏈哄垪琛ㄥ拰鏃堕棿鏌ヨ浜哄憳娴忚杞ㄨ抗
+func GetPersonDataByCameraIdAndTime(cameraId []string, startTime string, endTime string, serverIp string, serverPort string, indexName string) (map[string]interface{}, error) {
+
+ var filterArr []string
+ if cameraId != nil && len(cameraId) > 0 {
+ esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1)
+ filterArr = append(filterArr, `{
+ "terms": {
+ "cameraId": ["`+esCameraId+`"]
+ }
+ }`)
+ }
+ filterArr = append(filterArr, `{
+ "range": {
+ "picDate": {
+ "gte": "`+startTime+`",
+ "lte": "`+endTime+`"
+ }
+ }
+ }`)
+ filterArr = append(filterArr, ` {
+ "term": {
+ "targetInfo.targetType.raw": "Yolo"
+ }
+ }`)
+ queryStr := strings.Join(filterArr, ",")
+
+ personUrl := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
+ personBody := `{
+ "query": {
+ "bool": {
+ "filter": [
+ ` + queryStr + `
+ ]
+ }
+ },
+ "size": 2147483647,
+ "_source": {
+ "includes": [
+ "cameraId",
+ "cameraName",
+ "cameraAddr",
+ "targetInfo.targetScore",
+ "picDate",
+ "updateTime",
+ "picMaxUrl",
+ "targetInfo.belongsTargetId",
+ "targetInfo.targetLocation",
+ "picWH"
+ ]
+ }
+}`
+ //fmt.Println(personUrl)
+ //fmt.Println(personBody)
+ source := make(map[string]interface{})
+ queryStartTime := time.Now()
+ buf, err := EsReq("POST", personUrl, []byte(personBody))
+ if err != nil {
+ return nil, err
+ }
+ queryUseTime := time.Now().Sub(queryStartTime).Seconds() * 1000
+ sources, err := Sourcelist(buf)
+ if err != nil {
+ return nil, err
+ }
+ resData, err := PerSonAnalysis(sources)
+ source["result"] = resData
+ source["total"] = len(resData)
+ source["queryUseTime"] = queryUseTime
+ //println(sources)
+ return source, nil
+
+}
+
+// 鏍规嵁鏃堕棿鑼冨洿锛屾憚鍍忔満鍒楄〃锛屽垎缁勮仛鍚堜汉鑴稿垪琛�,杩斿洖鍒嗙粍鏁版嵁
+func GetFaceDataBucketsByCameraIdAndTimeReturnByGrouped(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) {
+ var filterArr []string
+ if cameraId != nil && len(cameraId) > 0 {
+ esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1)
+ filterArr = append(filterArr, `{
+ "terms": {
+ "cameraId": ["`+esCameraId+`"]
+ }
+ }`)
+ }
+ if personId != nil && len(personId) > 0 {
+ esPersonId := strings.Replace(strings.Trim(fmt.Sprint(personId), "[]"), " ", "\",\"", -1)
+ filterArr = append(filterArr, `{
+ "terms": {
+ "baseInfo.targetId": ["`+esPersonId+`"]
+ }
+ }`)
+ }
+ filterArr = append(filterArr, `{
+ "range": {
+ "picDate": {
+ "gte": "`+startTime+`",
+ "lte": "`+endTime+`"
+ }
+ }
+ }`)
+ filterArr = append(filterArr, ` {
+ "term": {
+ "targetInfo.targetType.raw": "FaceDetect"
+ }
+ }`)
+ queryStr := strings.Join(filterArr, ",")
+
+ var buckersUrl = "http://" + serverIp + ":" + ServerPort + "/" + indexName + "/_search"
+ var buckersBody = `{
+ "query": {
+ "bool": {
+ "filter": [
+ ` + queryStr + `
+ ]
+ }
+ },
+ "size": 0,
+ "aggs": {
+ "buckets_aggs": {
+ "composite": {
+ "sources": [
+ {
+ "baseInfo.targetId": {
+ "terms": {
+ "field": "baseInfo.targetId"
+ }
+ }
+ },
+ {
+ "cameraId": {
+ "terms": {
+ "field": "cameraId"
+ }
+ }
+ }
+ ],
+ "size": 10000000
+ },
+ "aggs":{
+ "top_attention_hits":{
+ "top_hits":{
+ "size": 1000000,
+ "sort": [
+ {
+ "picDate": {
+ "order": "asc"
+ }
+ }
+ ],
+ "_source":{
+ "includes":["baseInfo.targetId","cameraId","cameraName","cameraAddr","targetInfo.targetScore","targetInfo.picSmUrl","showLabels","baseInfo.tableId","baseInfo.tableName","baseInfo.bwType","baseInfo.targetName","baseInfo.compareScore","picDate","picMaxUrl","picWH"]
+ }
+ }
+ }
+ }
+ }
+ }
+}`
+ //fmt.Println(buckersUrl)
+ //fmt.Println(buckersBody)
+ sources := make(map[string]interface{})
+ queryStartTime := time.Now()
+ buf, err := EsReq("POST", buckersUrl, []byte(buckersBody))
+ if err != nil {
+ return nil, err
+ }
+ queryUseTime := time.Now().Sub(queryStartTime).Seconds() * 1000
+ //fmt.Println(queryUseTime)
+ tmpSources, err := SourceAggregationsReturnByGrouped(buf, thresholdTime)
+ if err != nil {
+ return nil, err
+ }
+ sources["result"] = tmpSources
+ sources["total"] = len(tmpSources)
+ sources["queryUseTime"] = queryUseTime
+ //println(sources)
+ return sources, nil
+}
+
+// 鏍规嵁鏃堕棿鑼冨洿锛屾憚鍍忔満鍒楄〃锛屽垎缁勮仛鍚堜汉鑴稿垪琛�
+func GetFaceDataBucketsByCameraIdAndTime(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) {
+ var filterArr []string
+ if cameraId != nil && len(cameraId) > 0 {
+ esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1)
+ filterArr = append(filterArr, `{
+ "terms": {
+ "cameraId": ["`+esCameraId+`"]
+ }
+ }`)
+ }
+ if personId != nil && len(personId) > 0 {
+ esPersonId := strings.Replace(strings.Trim(fmt.Sprint(personId), "[]"), " ", "\",\"", -1)
+ filterArr = append(filterArr, `{
+ "terms": {
+ "baseInfo.targetId": ["`+esPersonId+`"]
+ }
+ }`)
+ }
+ filterArr = append(filterArr, `{
+ "range": {
+ "picDate": {
+ "gte": "`+startTime+`",
+ "lte": "`+endTime+`"
+ }
+ }
+ }`)
+ filterArr = append(filterArr, ` {
+ "term": {
+ "targetInfo.targetType.raw": "FaceDetect"
+ }
+ }`)
+ queryStr := strings.Join(filterArr, ",")
+
+ var buckersUrl = "http://" + serverIp + ":" + ServerPort + "/" + indexName + "/_search"
+ var buckersBody = `{
+ "query": {
+ "bool": {
+ "filter": [
+ ` + queryStr + `
+ ]
+ }
+ },
+ "size": 0,
+ "aggs": {
+ "buckets_aggs": {
+ "composite": {
+ "sources": [
+ {
+ "baseInfo.targetId": {
+ "terms": {
+ "field": "baseInfo.targetId"
+ }
+ }
+ },
+ {
+ "cameraId": {
+ "terms": {
+ "field": "cameraId"
+ }
+ }
+ }
+ ],
+ "size": 10000000
+ },
+ "aggs":{
+ "top_attention_hits":{
+ "top_hits":{
+ "size": 1000000,
+ "sort": [
+ {
+ "picDate": {
+ "order": "asc"
+ }
+ }
+ ],
+ "_source":{
+ "includes":["baseInfo.targetId","cameraId","cameraName","cameraAddr","targetInfo.targetScore","targetInfo.picSmUrl","showLabels","baseInfo.tableId","baseInfo.tableName","baseInfo.bwType","baseInfo.targetName","baseInfo.compareScore","picDate","picMaxUrl","picWH"]
+ }
+ }
+ }
+ }
+ }
+ }
+}`
+ //fmt.Println(buckersUrl)
+ //fmt.Println(buckersBody)
+ queryStartTime := time.Now()
+ buf, err := EsReq("POST", buckersUrl, []byte(buckersBody))
+ if err != nil {
+ return nil, err
+ }
+ queryUseTime := time.Now().Sub(queryStartTime).Seconds() * 1000
+
+ sources, err := SourceAggregations(buf, thresholdTime, queryUseTime)
+ if err != nil {
+ return nil, err
+ }
+ return sources, nil
+}
+
+// 鏍规嵁鎶撴媿浜哄憳id鏇存柊锛坧icurl锛夊浘鐗囧湴鍧�
+func UpdatePicUrlById(id string, picUrl string, indexName string, serverIp string, serverPort string) (err error) {
+ updateTime := time.Now().Format("2006-01-02 15:04:05")
+ tRes, err := AIOceaninfosbyid([]string{id}, indexName, serverIp, serverPort)
+ if err != nil || len(tRes) == 0 {
+ return err
+ }
+ picMaxUrls := tRes[0].PicMaxUrl
+ sourceStr := `
+ "source": "ctx._source.picMaxUrl.add('` + picUrl + `');ctx._source.updateTime='` + updateTime + `'"
+`
+ if len(picMaxUrls) >= 2 {
+ sourceStr = `"source": "ctx._source.picMaxUrl[1]='` + picUrl + `';ctx._source.updateTime='` + updateTime + `'"`
+ }
+ var info interface{}
+ url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query?refresh=true"
+
+ var picUrlInfo = `
+ {
+ "script": {
+ ` + sourceStr + `
+ },
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "term": {
+ "id": "` + id + `"
+ }
+ }
+ ]
+ }
+ }
+ }
+ `
+ //logPrint("url: ", url, videoUrlInfo)
+ //fmt.Println(url, picUrlInfo)
+ buf, err := EsReq("POST", url, []byte(picUrlInfo))
+ if err != nil {
+ logPrint("http request videoUrlInfo info is err!")
+ return err
+ }
+ json.Unmarshal(buf, &info)
+ //logPrint(info)
+ out, ok := info.(map[string]interface{})
+ if !ok {
+ logPrint("http response interface can not change map[string]interface{}")
+ return errors.New("http response interface can not change map[string]interface{}")
+ }
+ middle, ok := out["updated"].(float64)
+ if !ok {
+ logPrint("first updated change error!", out)
+ return errors.New("first updated change error!")
+ }
+ if middle == 1 {
+ return nil
+ }
+ if middle == 0 {
+ return errors.New("宸茬粡淇敼")
+ }
+ return nil
+}
+
+// 鏍规嵁鎶撴媿浜哄憳id鏇存柊锛坴ideourl锛夋憚鍍忔満鍦板潃
func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int, err error) {
var info interface{}
@@ -196,23 +1059,30 @@
return statu, errors.New("http response interface can not change map[string]interface{}")
}
middle, ok := out["updated"].(float64)
- if !ok {
+ batches, ok1 := out["batches"].(float64)
+ if !ok || !ok1 {
logPrint("first updated change error!")
statu = 500
return statu, errors.New("first updated change error!")
}
- if middle == 1 {
- statu = 200
- return statu, nil
- }
- if middle == 0 {
- statu = 201
- return statu, errors.New("宸茬粡淇敼")
+ if batches == 0 {
+ logPrint("no such doc in database")
+ statu = 400
+ return statu, errors.New("鐩爣鏁版嵁涓嶅瓨鍦�")
+ } else {
+ if middle == 1 {
+ statu = 200
+ return statu, nil
+ }
+ if middle == 0 {
+ statu = 201
+ return statu, errors.New("宸茬粡淇敼")
+ }
}
return statu, nil
}
-//鑾峰彇褰撳墠鑺傜偣鎶撴媿搴撴墍鏈変汉鍛業D*缂撳瓨*
+// 鑾峰彇褰撳墠鑺傜偣鎶撴媿搴撴墍鏈変汉鍛業D*缂撳瓨*
func GetAllLocalVideopersonsId(compareArgs protomsg.CompareArgs, indexName string, serverIp string, serverPort string, alarmLevelTypes string) (capturetable []string) {
queryStr := ""
queryBody := compareArgs.InputValue
@@ -252,7 +1122,12 @@
isCollectStr := ""
isCollect := compareArgs.Collection
if isCollect != "" {
- isCollectStr = "{\"term\":{\"isCollect\":\"" + isCollect + "\"}},"
+ //isCollectStr = "{\"term\":{\"isCollect\":\"" + isCollect + "\"}},"
+ if isCollect == "1" {
+ isCollectStr = "{\"term\":{\"isCollect\":true}},"
+ } else if isCollect == "0" {
+ isCollectStr = "{\"term\":{\"isCollect\":false}},"
+ }
}
//鍒ゆ柇甯冮槻绛夌骇
@@ -284,7 +1159,7 @@
"\"size\":\"1000\"," +
"\"query\":{\"bool\":{" + queryStr +
"\"filter\":[" +
- "{\"term\":{\"targetInfo.targetType.raw\":\"face\"}}," +
+ "{\"term\":{\"targetInfo.targetType.raw\":\"FaceDetect\"}}," +
cameraIdStr +
alarmLevelStr +
taskIdStr +
@@ -298,8 +1173,8 @@
go func(reqParam string) {
defer wg.Done()
- //logPrint(url)
- //logPrint(prama)
+ logPrint(url)
+ logPrint(prama)
buf, err := EsReq("POST", url, []byte(reqParam))
if err != nil {
@@ -335,8 +1210,8 @@
"scroll": "1m",
"scroll_id" : "` + scroll_id + `"
}`
- //logPrint(scroll_url)
- //logPrint(jsonDSL)
+ logPrint(scroll_url)
+ logPrint(jsonDSL)
buf, err := EsReq("POST", scroll_url, []byte(jsonDSL))
if err != nil {
@@ -376,7 +1251,7 @@
return capturetable
}
-//鍒濆鍖栧疄鏃舵姄鎷�
+// 鍒濆鍖栧疄鏃舵姄鎷�
func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm string, category string, quantity int) ([]protomsg.AIOcean, error) {
var aIOceanInfo []protomsg.AIOcean
url := "http://" + serverIp + ":" + serverPort +
@@ -393,7 +1268,7 @@
if category != "all" {
filterArr = append(filterArr, ` {
"term":{
- "targetInfo.targetType":"`+category+`"
+ "targetInfo.targetType.raw":"`+category+`"
}
}`)
@@ -413,7 +1288,7 @@
"sort":[{"picDate":{"order":"desc"}}],
"_source": {"includes":[],"excludes":["*.feature"]}
}`
- logPrint(DSLJson)
+ //logPrint(DSLJson)
buf, err := EsReq("POST", url, []byte(DSLJson))
if err != nil {
return aIOceanInfo, err
@@ -429,7 +1304,7 @@
return aIOcean, nil
}
-//瀹炴椂鎶撴媿
+// 瀹炴椂鎶撴媿
func RealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool) ([]protomsg.AIOcean, error) {
var aIOceanInfo []protomsg.AIOcean
url := "http://" + serverIp + ":" + serverPort +
@@ -473,7 +1348,7 @@
return aIOcean, nil
}
-//缁煎悎缁熻
+// 缁煎悎缁熻
func StatisticsComprehensive(serverIp string, serverPort string, indexName string, isAlarm string) (total int, err error) {
url := "http://" + serverIp + ":" + serverPort +
"/" + indexName + "/_search"
@@ -503,7 +1378,6 @@
}
}
}`
- //logPrint(DSLJson)
buf, err := EsReq("POST", url, []byte(DSLJson))
if err != nil {
return total, err
@@ -523,7 +1397,7 @@
return total, nil
}
-//瀹炴椂鎶ヨ浠诲姟姣旂巼
+// 瀹炴椂鎶ヨ浠诲姟姣旂巼
func RealTimeAlarmTaskRate(serverIp string, serverPort string, indexName string) (sources []map[string]interface{}, err error) {
url := "http://" + serverIp + ":" + serverPort +
"/" + indexName + "/_search"
@@ -541,9 +1415,9 @@
}
},
"aggs":{
- "sdkName_status":{
+ "taskName_status":{
"terms":{
- "field":"sdkName.raw"
+ "field":"taskName.raw"
}
}
}
@@ -562,11 +1436,11 @@
if !ok {
return nil, errors.New("first hits change error!")
}
- sdkName_status, ok := middle["sdkName_status"].(map[string]interface{})
+ sdkName_status, ok := middle["taskName_status"].(map[string]interface{})
if !ok {
return nil, errors.New("first hits change error!")
}
-
+ //fmt.Println(sdkName_status)
for _, in := range sdkName_status["buckets"].([]interface{}) {
var source = make(map[string]interface{}, 0)
tmpbuf, ok := in.(map[string]interface{})
@@ -584,11 +1458,20 @@
return sources, nil
}
-//鑱氬悎浠诲姟鍒楄〃锛宼askId+taskName
-func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string) (sources []map[string]interface{}, err error) {
+// 鑱氬悎浠诲姟鍒楄〃锛宼askId+taskName
+func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string, cameraIds []string) (sources []map[string]interface{}, err error) {
url := "http://" + serverIp + ":" + serverPort +
"/" + indexName + "/_search"
serverFilterStr := ""
+ cameIdFilterStr := ""
+ if cameraIds != nil && len(cameraIds) > 0 {
+ cameIdsStr := strings.Replace(strings.Trim(fmt.Sprint(cameraIds), "[]"), " ", "\",\"", -1)
+ cameIdFilterStr = `,{
+ "term": {
+ "cameraId": "` + cameIdsStr + `"
+ }
+ }`
+ }
if analyServerId != "" {
serverFilterStr = `,
"query": {
@@ -597,8 +1480,9 @@
{
"term": {
"analyServerId": "` + analyServerId + `"
+ }
}
- }
+ ` + cameIdFilterStr + `
]
}
}`
@@ -670,7 +1554,7 @@
}
-//娣诲姞鍗冲皢鍒犻櫎淇″彿
+// 娣诲姞鍗冲皢鍒犻櫎淇″彿
func AddDeleteSignal() {
}
@@ -711,13 +1595,13 @@
}
-//鏌ヨ鏃堕棿娈垫暟鎹� *缂撳瓨*
+// 鏌ヨ鏃堕棿娈垫暟鎹� *缂撳瓨*
func GetPeriodInfos(serverIp string, serverPort string, startTime string, endTime string, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) {
var capdbinfo []*protomsg.MultiFeaCache
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
var source []string
switch targetType {
- case "face":
+ case "face", "FaceDetect":
source = []string{"id", "targetInfo.feature", "analyServerId", "cameraId"}
case "track":
source = []string{"id", "targetInfo.feature", "analyServerId", "cameraId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.cameraId", "linkTagInfo.targetInfo.targetLocation"}
@@ -767,21 +1651,35 @@
}
// 鏌ヨ搴曞簱浜哄憳淇℃伅*缂撳瓨*
-func GetOceanFeatures(serverIp string, serverPort string, queryIndexNum int, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) {
- var dbinfos []*protomsg.MultiFeaCache
- point := strconv.Itoa(queryIndexNum)
- number := strconv.Itoa(queryNums)
+func GetOceanFeatures(serverIp string, serverPort string, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) {
+ //queryIndexNum int
+ //var dbinfos []*protomsg.MultiFeaCache
+ dbinfos := make([]*protomsg.MultiFeaCache, 0)
+ //dbinfosss := make([]*protomsg.MultiFeaCache,0)
+ //dbinfoss = append(dbinfoss, dbinfosss...)
+
JsonDSL := ""
var source []string
switch targetType {
- case "face":
+ case "face", "FaceDetect":
source = []string{"id", "targetInfo.feature", "analyServerId"}
case "track":
source = []string{"id", "targetInfo.feature", "analyServerId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.targetInfo.targetLocation"}
}
- url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
- JsonDSL = ` {
- "from": ` + point + `,
+
+ url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local;scroll=1m"
+
+ var lock sync.RWMutex
+ var wg sync.WaitGroup
+
+ for i := 0; i < 48; i++ {
+ //璇锋眰浣�
+ JsonDSL = ` {
+ "slice": {
+ "id": "` + strconv.Itoa(i) + `",
+ "max": 48
+ },
+ "size":` + strconv.Itoa(queryNums) + `,
"query": {
"bool": {
"filter": [
@@ -793,30 +1691,90 @@
]
}
},
- "size":` + number + `,
"_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"]
}`
-
- logPrint("url: ",url)
- logPrint("url: ",JsonDSL)
- buf, err := EsReq("POST", url, []byte(JsonDSL))
- if err != nil {
- return dbinfos, err
- }
+ wg.Add(1)
+ go func(reqJsonDSL string) {
+ defer wg.Done()
- // 杩斿洖 _source 鏁扮粍
- sources, err := Sourcelist(buf)
- if err != nil {
- return dbinfos, err
- }
+ //fmt.Println(url)
+ //fmt.Println(prama)
+ //logPrint("url: ",url)
+ //logPrint("url: ",reqJsonDSL)
+ buf, err := EsReq("POST", url, []byte(reqJsonDSL))
+ if err != nil {
+ logPrint("EsReq: ", err)
+ return
+ }
- // 杩斿洖鎵�鏈夋煡璇㈢殑鏁版嵁
- dbpersoninfos := Parsesources(sources)
- return dbpersoninfos, nil
+ // 杩斿洖 _source 鏁扮粍
+ sources, err := Sourcelistforscroll(buf)
+ if err != nil {
+ logPrint("EsReq: ", err)
+ return
+ }
+ // 杩斿洖鎵�鏈夋煡璇㈢殑鏁版嵁
+ ftmpDatas := Parsesources(sources["sourcelist"].([]map[string]interface{}))
+ lock.Lock()
+ dbinfos = append(dbinfos, ftmpDatas...)
+ //logPrint("prsLen: ", len(Parsesources(sources["sourcelist"].([]map[string]interface{}))))
+ //logPrint("dbinfosLen: ", len(dbinfos))
+ lock.Unlock()
+
+ scroll_id := sources["scroll_id"].(string)
+
+ //scroll璇锋眰澶�
+ scroll_url := "http://" + serverIp + ":" + serverPort + "/_search/scroll"
+ for {
+ next_scroll_id := ""
+ if next_scroll_id != "" {
+ scroll_id = next_scroll_id
+ }
+ jsonDSL := `{
+ "scroll": "1m",
+ "scroll_id" : "` + scroll_id + `"
+ }`
+ //fmt.Println(scroll_url)
+ //fmt.Println(jsonDSL)
+ buf, err := EsReq("POST", scroll_url, []byte(jsonDSL))
+
+ if err != nil {
+ //fmt.Println("lenth1: ", len(dbinfos))
+ return
+ }
+ nextSources, err := Sourcelistforscroll(buf)
+
+ if nextSources == nil {
+ return
+ }
+
+ nextM := nextSources["sourcelist"].([]map[string]interface{})
+ //fmt.Println("id",nextSources)
+ if nextM == nil || len(nextM) == 0 {
+ //fmt.Println("lenth: ", len(capturetable))
+ return
+ }
+ tmpDatas := Parsesources(nextM)
+ lock.Lock()
+ dbinfos = append(dbinfos, tmpDatas...)
+ //logPrint("tmpDatasLen: ", len(tmpDatas))
+ //logPrint("AdbinfosLen: ", len(dbinfos))
+ lock.Unlock()
+
+ next_scroll_id = nextSources["scroll_id"].(string)
+ }
+
+ }(JsonDSL)
+ }
+ wg.Wait()
+
+ //fmt.Println("lenth_all: ", len(dbinfos))
+
+ return dbinfos, nil
}
-//************************CORN TASK*******************************
-//鏌ヨ鏃ユ湡鑼冨洿鍐呮槸鍚﹁繕瀛樺湪鏁版嵁
+// ************************CORN TASK*******************************
+// 鏌ヨ鏃ユ湡鑼冨洿鍐呮槸鍚﹁繕瀛樺湪鏁版嵁
func QueryAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
deleteJson := `{
@@ -847,7 +1805,7 @@
if err != nil {
return false, errors.New("瑙g爜澶辫触")
}
- if resTotal == -1 || resTotal == 0{
+ if resTotal == -1 || resTotal == 0 {
result = false
} else {
result = true
@@ -855,9 +1813,8 @@
return result, nil
}
-
-//鎸夋棩鏈熻寖鍥达紝鏈嶅姟鍣↖d鍒犻櫎鏁版嵁
-func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
+// 鎸夋棩鏈熻寖鍥达紝鏈嶅姟鍣↖d鍒犻櫎鏁版嵁
+func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (total int, err error) {
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_delete_by_query"
deleteJson := `{
"query":{
@@ -879,23 +1836,20 @@
}
}
} `
+ fmt.Println(url)
+ fmt.Println(deleteJson)
buf, err := EsReq("POST", url, []byte(deleteJson))
if err != nil {
- return false, errors.New("璇锋眰澶辫触")
+ return -1, errors.New("璇锋眰澶辫触")
}
deleteRes, err := SourceDeleted(buf)
if err != nil {
- return false, errors.New("瑙g爜澶辫触")
+ return -1, errors.New("瑙g爜澶辫触")
}
- if deleteRes == -1 {
- result = false
- } else {
- result = true
- }
- return result, nil
+ return deleteRes, nil
}
-//缁欐墍鏈夎妭鐐硅拷鍔犲垹闄や换鍔′俊鎭�
+// 缁欐墍鏈夎妭鐐硅拷鍔犲垹闄や换鍔′俊鎭�
func AddDelTask(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
addJson := `{
@@ -911,7 +1865,15 @@
}
},
"query": {
- "match_all": {}
+ "bool": {
+ "filter": [
+ {
+ "term": {
+ "application": "loopCoverage"
+ }
+ }
+ ]
+ }
}
}`
buf, err := EsReq("POST", url, []byte(addJson))
@@ -930,7 +1892,7 @@
return result, nil
}
-//绉婚櫎宸叉墽琛屽畬鐨勫垹闄や换鍔�
+// 绉婚櫎宸叉墽琛屽畬鐨勫垹闄や换鍔�
func DeleteDelTask(serverIp string, serverPort string, indexName string, analyServerId string) (result bool, err error) {
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
deleteJson := `{
@@ -963,3 +1925,66 @@
}
return result, nil
}
+
+type ShardInfo struct {
+ ShardIndex string `json:"shardIndex"` //鍒嗙墖鎵�灞炵储寮曞悕绉�
+ ShardNum int `json:"shardNum"` //鍒嗙墖鍙�
+ ShardRole string `json:"shardRole"` //鍒嗙墖瑙掕壊(涓诲垎鐗囷細primary 鍓湰鍒嗙墖锛歳eplica)
+ ShardState string `json:"shardState"` //鍒嗙墖鐘舵��(鍚敤锛歋TARTED 鏈惎鐢細UNASSIGNED)
+ ShardDocs int `json:"shardDocs"` //鍒嗙墖宸蹭繚瀛樻枃妗f暟
+ ShardStore string `json:"shardStore"` //鍒嗙墖褰撳墠瀛樺偍鏁版嵁澶у皬
+ ShardIp string `json:"shardIp"` //鍒嗙墖鎵�鍦ㄨ妭鐐筰p
+ ShardNode string `json:"shardNode"` //鍒嗙墖鎵�鍦ㄨ妭鐐瑰悕绉�
+}
+
+// 鑾峰彇绱㈠紩鍒嗙墖淇℃伅
+func GetShardsByIndex(serverIp string, serverPort string, indexName string) ([]ShardInfo, error) {
+ url := "http://" + serverIp + ":" + serverPort + "/_cat/shards?v"
+ buf, err := EsReq("GET", url, []byte(""))
+ if err != nil {
+ return nil, err
+ }
+ var inf = []ShardInfo{}
+ res := strings.Split(string(buf), "\n")[1:]
+ for _, r := range res {
+ if r != "" {
+
+ inx := strings.Fields(r)
+ index := inx[0]
+ shard, _ := strconv.Atoi(inx[1])
+ prired := inx[2]
+ if prired == "r" {
+ prired = "replica"
+ }
+ if prired == "p" {
+ prired = "primary"
+ }
+ state := inx[3]
+ docs := 0
+ store := ""
+ ip := ""
+ node := ""
+ if state == "STARTED" {
+ docs, _ = strconv.Atoi(inx[4])
+ store = inx[5]
+ ip = inx[6]
+ node = inx[7]
+ }
+ if index == indexName {
+ inf = append(inf, ShardInfo{
+ ShardIndex: index,
+ ShardNum: shard,
+ ShardRole: prired,
+ ShardState: state,
+ ShardDocs: docs,
+ ShardStore: store,
+ ShardIp: ip,
+ ShardNode: node,
+ })
+
+ }
+ }
+
+ }
+ return inf, nil
+}
--
Gitblit v1.8.0