From 171d94e0f254b485ed5d09cef9a208b0f5672048 Mon Sep 17 00:00:00 2001 From: sunty <1172534965@qq.com> Date: 星期二, 08 九月 2020 14:01:26 +0800 Subject: [PATCH] fix source aggs --- EsApi.go | 377 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 files changed, 362 insertions(+), 15 deletions(-) diff --git a/EsApi.go b/EsApi.go index 82eeb03..64235bf 100644 --- a/EsApi.go +++ b/EsApi.go @@ -1,15 +1,15 @@ package esutil import ( + "basic.com/pubsub/protomsg.git" "encoding/json" "errors" "fmt" + "sort" "strconv" "strings" "sync" "time" - - "basic.com/pubsub/protomsg.git" ) var logPrint = func(i ...interface{}) { @@ -159,11 +159,359 @@ } +/**************************************customer analysis util start**************************************/ +/*******************sort []map util*******************/ +type MapsSort struct { + Key string + MapList []map[string]interface{} +} + +func (m *MapsSort) Len() int { + return len(m.MapList) +} + +func (m *MapsSort) Less(i, j int) bool { + return m.MapList[i][m.Key].(string) > m.MapList[j][m.Key].(string) +} + +func (m *MapsSort) Swap(i, j int) { + m.MapList[i], m.MapList[j] = m.MapList[j], m.MapList[i] +} + +/*******************sort []map util*******************/ +//鏍规嵁鏃堕棿鑼冨洿鑱氬悎鎵�鏈夊尯鍩熶汉淇℃伅锛岃繑鍥炲浐瀹氭潯鏁� +func GetFaceDataByTimeAndTotal(startTime string, endTime string, total int, thresholdTime int, thresholdStayTime int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) { + var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" + var requestBody = `{ + "query": { + "bool": { + "filter": [ + { + "range": { + "picDate": { + "gte": "` + startTime + `", + "lte": "` + endTime + `" + } + } + }, + { + "term":{ + "targetInfo.targetType.raw": "FaceDetect" + } + } + ] + } + }, + "size": 0, + "aggs": { + "buckets_aggs": { + "composite": { + "sources": [ + { + "faceId": { + "terms": { + "field": "baseInfo.targetId" + } + } + }, + { + "areaId": { + "terms": { + "field": "targetInfo.areaId" + } + } + } + ], + "size": 10000000 + }, + "aggs": { + "top_attention_hits": { + "top_hits": { + "size": 1000000, + "sort": [ + { + "picDate": { + "order": "asc" + } + } + ], + "_source": { + "includes": [ + "baseInfo.targetId", + "targetInfo.picSmUrl", + "targetInfo.areaId", + "picDate" + ] + } + } + } + } + } + } +}` + buf, err := EsReq("POST", requestUrl, []byte(requestBody)) + if err != nil { + return nil, err + } + source, err := FaceSourceAggregations(buf, thresholdTime, thresholdStayTime) + if err != nil { + return nil, err + } + if len(source) == 0{ + return source,nil + } + faceSource := make([]map[string]interface{}, 0) + for index, info := range source { + if int(info["stayTime"].(float64)) > thresholdStayTime { + faceSource = append(faceSource, source[index]) + } + } + mapsSort := MapsSort{} + mapsSort.Key = "endTime" + mapsSort.MapList = faceSource + sort.Sort(&mapsSort) + if len(faceSource) > total { + return mapsSort.MapList[:total], nil + } + return mapsSort.MapList, nil +} + +func GetFaceDataByTimeAndId(startTime string, endTime string, id string, thresholdTime int, thresholdStayTime int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) { + var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" + var requestBody = `{ + "query": { + "bool": { + "filter": [ + { + "range": { + "picDate": { + "gte": "` + startTime + `", + "lte": "` + endTime + `" + } + } + }, + { + "term":{ + "targetInfo.targetType.raw": "FaceDetect" + } + }, + { + "term":{ + "baseInfo.targetId": "` + id + `" + } + } + ] + } + }, + "size": 0, + "aggs": { + "buckets_aggs": { + "composite": { + "sources": [ + { + "faceId": { + "terms": { + "field": "baseInfo.targetId" + } + } + }, + { + "areaId": { + "terms": { + "field": "targetInfo.areaId" + } + } + } + ], + "size": 10000000 + }, + "aggs": { + "top_attention_hits": { + "top_hits": { + "size": 1000000, + "sort": [ + { + "picDate": { + "order": "asc" + } + } + ], + "_source": { + "includes": [ + "baseInfo.targetId", + "targetInfo.picSmUrl", + "targetInfo.areaId", + "picDate" + ] + } + } + } + } + } + } +}` + buf, err := EsReq("POST", requestUrl, []byte(requestBody)) + if err != nil { + return nil, err + } + source, err := FaceSourceAggregations(buf, thresholdTime, thresholdStayTime) + if err != nil { + return nil, err + } + if len(source) == 0{ + return source,nil + } + faceSource := make([]map[string]interface{}, 0) + for index, info := range source { + if int(info["stayTime"].(float64)) > thresholdStayTime { + faceSource = append(faceSource, source[index]) + } + } + mapsSort := MapsSort{} + mapsSort.Key = "startTime" + mapsSort.MapList = faceSource + sort.Sort(&mapsSort) + return mapsSort.MapList, nil +} + +func GetFaceIdDeduplication(startTime string, endTime string, serverIp string, serverPort string, indexName string) (ids []map[string]interface{}, err error) { + var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" + var requestBody = `{ + "query": { + "bool": { + "filter": [ + { + "range": { + "picDate": { + "gte": "` + startTime + `", + "lte": "` + endTime + `" + } + } + }, + { + "term": { + "targetInfo.targetType.raw": "FaceDetect" + } + } + ] + } + }, + "size": 0, + "aggs": { + "buckets_aggs": { + "composite": { + "sources": [ + { + "faceId": { + "terms": { + "field": "baseInfo.targetId" + } + } + } + ], + "size": 10000000 + }, + "aggs": { + "top_attention_hits": { + "top_hits": { + "size": 1, + "sort": [ + { + "picDate": { + "order": "desc" + } + } + ], + "_source": { + "includes": [ + "picDate" + ] + } + } + } + } + } + } + } +}` + //fmt.Println(requestUrl) + //fmt.Println(requestBody) + buf, err := EsReq("POST", requestUrl, []byte(requestBody)) + if err != nil { + return nil, err + } + ids, err1 := SourceDeduplication(buf) + if err1 != nil { + return nil, err1 + } + if len(ids) > 1 { + mapsSort := MapsSort{} + mapsSort.Key = "lastTime" + mapsSort.MapList = ids + sort.Sort(&mapsSort) + return mapsSort.MapList, nil + } + return ids, nil +} + +//缁熻鍚勪釜鍖哄煙浜烘暟 +func StatisticsEveryAreaPersonsNumber(startTime string, endTime string, serverIp string, serverPort string, indexName string) ([]map[string]interface{}, error) { + var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" + var requestBody = `{ + "query": { + "bool": { + "filter": [ + { + "range": { + "picDate": { + "gte": "` + startTime + `", + "lte": "` + endTime + `" + } + } + }, + { + "term": { + "targetInfo.targetType.raw": "Yolo" + } + } + ] + } + }, + "size": 0, + "aggs": { + "buckets_aggs": { + "composite": { + "sources": [ + { + "areaId": { + "terms": { + "field": "targetInfo.areaId" + } + } + } + ], + "size": 10000000 + } + } + } +}` + buf, err := EsReq("POST", requestUrl, []byte(requestBody)) + if err != nil { + return nil, err + } + result, err := SourceStatistics(buf) + if err != nil { + return nil, err + } + return result, nil +} + +/**************************************customer analysis util end**************************************/ //鏍规嵁鎽勫儚鏈哄垪琛ㄥ拰鏃堕棿鏌ヨ浜哄憳娴忚杞ㄨ抗 -func GetPersonDataByCameraIdAndTime(cameraId []string, startTime string, endTime string, serverIp string, ServerPort string, indexName string) (map[string]interface{}, error) { +func GetPersonDataByCameraIdAndTime(cameraId []string, startTime string, endTime string, serverIp string, serverPort string, indexName string) (map[string]interface{}, error) { var filterArr []string - if cameraId != nil && len(cameraId) > 0{ + if cameraId != nil && len(cameraId) > 0 { esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1) filterArr = append(filterArr, `{ "terms": { @@ -186,7 +534,7 @@ }`) queryStr := strings.Join(filterArr, ",") - personUrl := "http://" + serverIp + ":" + ServerPort + "/" + indexName + "/_search" + personUrl := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" personBody := `{ "query": { "bool": { @@ -236,7 +584,7 @@ //鏍规嵁鏃堕棿鑼冨洿锛屾憚鍍忔満鍒楄〃锛屽垎缁勮仛鍚堜汉鑴稿垪琛�,杩斿洖鍒嗙粍鏁版嵁 func GetFaceDataBucketsByCameraIdAndTimeReturnByGrouped(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) { var filterArr []string - if cameraId != nil && len(cameraId) > 0{ + if cameraId != nil && len(cameraId) > 0 { esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1) filterArr = append(filterArr, `{ "terms": { @@ -244,7 +592,7 @@ } }`) } - if personId != nil &&len(personId) > 0{ + if personId != nil && len(personId) > 0 { esPersonId := strings.Replace(strings.Trim(fmt.Sprint(personId), "[]"), " ", "\",\"", -1) filterArr = append(filterArr, `{ "terms": { @@ -296,7 +644,7 @@ } } ], - "size": 100 + "size": 10000000 }, "aggs":{ "top_attention_hits":{ @@ -342,7 +690,7 @@ //鏍规嵁鏃堕棿鑼冨洿锛屾憚鍍忔満鍒楄〃锛屽垎缁勮仛鍚堜汉鑴稿垪琛� func GetFaceDataBucketsByCameraIdAndTime(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) { var filterArr []string - if cameraId != nil && len(cameraId) > 0{ + if cameraId != nil && len(cameraId) > 0 { esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1) filterArr = append(filterArr, `{ "terms": { @@ -350,7 +698,7 @@ } }`) } - if personId != nil &&len(personId) > 0{ + if personId != nil && len(personId) > 0 { esPersonId := strings.Replace(strings.Trim(fmt.Sprint(personId), "[]"), " ", "\",\"", -1) filterArr = append(filterArr, `{ "terms": { @@ -402,7 +750,7 @@ } } ], - "size": 100 + "size": 10000000 }, "aggs":{ "top_attention_hits":{ @@ -448,9 +796,8 @@ return err } picMaxUrls := tRes[0].PicMaxUrl - sourceStr := ` - "lang":"painless", - "inline": "ctx._source.picMaxUrl.add('` + picUrl + `');ctx._source.updateTime='` + updateTime + `'" + sourceStr := ` + "source": "ctx._source.picMaxUrl.add('` + picUrl + `');ctx._source.updateTime='` + updateTime + `'" ` if len(picMaxUrls) >= 2 { sourceStr = `"source": "ctx._source.picMaxUrl[1]='` + picUrl + `';ctx._source.updateTime='` + updateTime + `'"` @@ -492,7 +839,7 @@ } middle, ok := out["updated"].(float64) if !ok { - logPrint("first updated change error!") + logPrint("first updated change error!", out) return errors.New("first updated change error!") } if middle == 1 { -- Gitblit v1.8.0