From 3b2d91bee9c7385927110b0c3de988ffe78931d8 Mon Sep 17 00:00:00 2001 From: sunty <1172534965@qq.com> Date: 星期一, 24 八月 2020 13:50:54 +0800 Subject: [PATCH] add grouped --- EsClient.go | 103 +++++++++++++++++++++++++ EsApi.go | 108 +++++++++++++++++++++++++- 2 files changed, 202 insertions(+), 9 deletions(-) diff --git a/EsApi.go b/EsApi.go index b812bf5..875ba9a 100644 --- a/EsApi.go +++ b/EsApi.go @@ -160,7 +160,7 @@ } //鏍规嵁鎽勫儚鏈哄垪琛ㄥ拰鏃堕棿鏌ヨ浜哄憳娴忚杞ㄨ抗 -func GetPersonDataByCameraIdAndTime(cameraId []string, startTime string, endTime string, serverIp string, ServerPort string, indexName string) ([]map[string]interface{}, error) { +func GetPersonDataByCameraIdAndTime(cameraId []string, startTime string, endTime string, serverIp string, ServerPort string, indexName string) (map[string]interface{}, error) { esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1) personUrl := "http://" + serverIp + ":" + ServerPort + "/" + indexName + "/_search" personBody := `{ @@ -202,25 +202,120 @@ "picMaxUrl", "targetInfo.belongsTargetId", "targetInfo.targetLocation", - "picWH" + "picWH.picW" ] } }` //fmt.Println(personUrl) //fmt.Println(personBody) + source := make(map[string]interface{}) + queryStartTime := time.Now() buf, err := EsReq("POST", personUrl, []byte(personBody)) if err != nil { return nil, err } - + queryUseTime := time.Now().Sub(queryStartTime).Seconds()*1000 sources, err := Sourcelist(buf) if err != nil { return nil, err } resData,err := PerSonAnalysis(sources) + source["result"] = resData + source["total"] = len(resData) + source["queryUseTime"] = queryUseTime //println(sources) - return resData, nil + return source, nil +} + +//鏍规嵁鏃堕棿鑼冨洿锛屾憚鍍忔満鍒楄〃锛屽垎缁勮仛鍚堜汉鑴稿垪琛�,杩斿洖鍒嗙粍鏁版嵁 +func GetFaceDataBucketsByCameraIdAndTimeReturnByGrouped(cameraId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) { + esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1) + var buckersUrl = "http://" + serverIp + ":" + ServerPort + "/" + indexName + "/_search" + var buckersBody = `{ + "query": { + "bool": { + "filter": [ + { + "range": { + "picDate": { + "gte": "` + startTime + `", + "lte": "` + endTime + `" + } + } + }, + { + "term": { + "targetInfo.targetType.raw": "FaceDetect" + } + }, + { + "terms": { + "cameraId": ["` + esCameraId + `"] + } + } + ] + } + }, + "size": 0, + "aggs": { + "buckets_aggs": { + "composite": { + "sources": [ + { + "baseInfo.targetId": { + "terms": { + "field": "baseInfo.targetId" + } + } + }, + { + "cameraId": { + "terms": { + "field": "cameraId" + } + } + } + ], + "size": 100000000 + }, + "aggs":{ + "top_attention_hits":{ + "top_hits":{ + "size": 1000000, + "sort": [ + { + "picDate": { + "order": "asc" + } + } + ], + "_source":{ + "includes":["baseInfo.targetId","cameraId","cameraName","cameraAddr","targetInfo.targetScore","targetInfo.picSmUrl","showLabels","baseInfo.tableId","baseInfo.tableName","baseInfo.bwType","baseInfo.targetName","baseInfo.compareScore","picDate","picMaxUrl","picWH"] + } + } + } + } + } + } +}` + sources := make(map[string]interface{}) + queryStartTime := time.Now() + buf, err := EsReq("POST", buckersUrl, []byte(buckersBody)) + if err != nil { + return nil, err + } + queryUseTime := time.Now().Sub(queryStartTime).Seconds()*1000 + //fmt.Println(queryUseTime) + tmpSources, err := SourceAggregationsReturnByGrouped(buf, thresholdTime) + if err != nil { + return nil, err + } + sources["result"] = tmpSources + sources["total"] = len(tmpSources) + sources["queryUseTime"] = queryUseTime + //println(sources) + return sources, nil } //鏍规嵁鏃堕棿鑼冨洿锛屾憚鍍忔満鍒楄〃锛屽垎缁勮仛鍚堜汉鑴稿垪琛� @@ -296,16 +391,17 @@ }` //fmt.Println(buckersUrl) //fmt.Println(buckersBody) + queryStartTime := time.Now() buf, err := EsReq("POST", buckersUrl, []byte(buckersBody)) if err != nil { return nil, err } + queryUseTime := time.Now().Sub(queryStartTime).Seconds()*1000 - sources, err := SourceAggregations(buf, thresholdTime) + sources, err := SourceAggregations(buf, thresholdTime,queryUseTime) if err != nil { return nil, err } - //println(sources) return sources, nil } diff --git a/EsClient.go b/EsClient.go index 09b1861..8481e19 100644 --- a/EsClient.go +++ b/EsClient.go @@ -476,7 +476,7 @@ return tmpinfos } -func SourceAggregations(buf [] byte, thresholdTime float64) (sources map[string]interface{}, err error) { +func SourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) { s := make(map[string]interface{}) loc, err := time.LoadLocation("Asia/Shanghai") if err != nil { @@ -576,13 +576,110 @@ // sources = append(sources, tmpSources) } count := len(allSource) - fmt.Println(count) + //fmt.Println(count) s["count"] = count s["allSource"] = allSource - + s["queryUseTime"] = queryUseTime return s, nil } +func SourceAggregationsReturnByGrouped(buf [] byte, thresholdTime float64) (sources []map[string]interface{}, err error) { + loc, err := time.LoadLocation("Asia/Shanghai") + if err != nil { + return nil, errors.New("鏃跺尯璁剧疆閿欒") + } + var info interface{} + json.Unmarshal(buf, &info) + out, ok := info.(map[string]interface{}) + if !ok { + return nil, errors.New("http response interface can not change map[string]interface{}") + } + middle, ok := out["aggregations"].(map[string]interface{}) + if !ok { + return nil, errors.New("first hits change error!") + } + bucketsAggs := middle["buckets_aggs"].(map[string]interface{}) + buckets := bucketsAggs["buckets"].([]interface{}) + if len(buckets) == 0 { + return nil, nil + } + for _, inf := range buckets { + tmpSources := make(map[string]interface{}, 0) + hitsSources := make([]map[string]interface{}, 0) + groupKey := inf.(map[string]interface{})["key"].(map[string]interface{}) + topAttentionHits := inf.(map[string]interface{})["top_attention_hits"].(map[string]interface{}) + middleHits := topAttentionHits["hits"].(map[string]interface{}) + finalHits := middleHits["hits"].([]interface{}) + tmpHitSource := make(map[string]interface{}) + startTime := "" + for _, in := range finalHits { + tmpbuf, ok := in.(map[string]interface{}) + if !ok { + fmt.Println("change to source error!") + continue + } + source, ok := tmpbuf["_source"].(map[string]interface{}) + if !ok { + fmt.Println("change _source error!") + continue + } + baseInfo := source["baseInfo"].([]interface{})[0].(map[string]interface{}) + targetInfo := source["targetInfo"].([]interface{})[0].(map[string]interface{}) + tmpTime := source["picDate"].(string) + mTime, err := time.ParseInLocation("2006-01-02 15:04:05", tmpTime, loc) + if err != nil { + return nil, errors.New("鏃堕棿瑙f瀽閿欒") + } + + sTime := tmpTime + eTime := tmpTime + stayTime := 0.0 + if startTime != "" { + sinTime, _ := time.ParseInLocation("2006-01-02 15:04:05", startTime, loc) + stayTime = math.Abs(sinTime.Sub(mTime).Seconds()) + if stayTime <= thresholdTime { + startTime = tmpTime + hitsSources[len(hitsSources)-1]["endTime"] = tmpTime + continue + } else { + if sinTime.Sub(mTime).Seconds() == 0 { + sinTime.Add(time.Second * 1) + sinTime.Format("2006-01-02 15:04:05") + hitsSources[len(hitsSources)-1]["endTime"] = sinTime + } + } + } + startTime = tmpTime + tmpHitSource["personId"] = baseInfo["targetId"].(string) + tmpHitSource["cameraId"] = source["cameraId"].(string) + tmpHitSource["cameraName"] = source["cameraName"].(string) + tmpHitSource["cameraAddr"] = source["cameraAddr"].(string) + tmpHitSource["targetScore"] = int(targetInfo["targetScore"].(float64)) + tmpHitSource["properties"] = source["showLabels"].(string) + tmpHitSource["tableId"] = baseInfo["tableId"].(string) + tmpHitSource["tableName"] = baseInfo["tableName"].(string) + tmpHitSource["bwType"] = baseInfo["bwType"].(string) + tmpHitSource["personName"] = baseInfo["targetName"].(string) + tmpHitSource["compareScore"] = int(baseInfo["compareScore"].(float64)) + tmpHitSource["startTime"] = sTime + tmpHitSource["startBackGroundPicUrl"] = source["picMaxUrl"].([]interface{}) + tmpHitSource["startFacePicUrl"] = targetInfo["picSmUrl"].(string) + tmpHitSource["endTime"] = eTime + tmpHitSource["stayTime"] = stayTime + tmpHitSource["endTBackGroundPicUrl"] = source["picMaxUrl"].([]interface{}) + tmpHitSource["endTFacePicUrl"] = targetInfo["picSmUrl"].(string) + if source["picWH"] != nil { + tmpHitSource["picWH"] = source["picWH"].(map[string]interface{}) + } + hitsSources = append(hitsSources, tmpHitSource) + } + tmpSources["groupKey"] = groupKey + tmpSources["hits_sources"] = hitsSources + sources = append(sources, tmpSources) + } + return sources, nil +} + //瑙f瀽鎶撴媿搴撲汉鍛樼粨鏋� func PerSonAnalysis(preData []map[string]interface{}) (sources []map[string]interface{}, err error) { loc, err := time.LoadLocation("Asia/Shanghai") -- Gitblit v1.8.0