From d0e6e8c6ef16afbc276fc13dece6239476f8d4e3 Mon Sep 17 00:00:00 2001 From: sunty <1172534965@qq.com> Date: 星期五, 28 八月 2020 16:46:49 +0800 Subject: [PATCH] add out --- EsClient.go | 110 +++++++++++++++++++++ EsApi.go | 152 ++++++++++++++++++++++++++++-- 2 files changed, 248 insertions(+), 14 deletions(-) diff --git a/EsApi.go b/EsApi.go index d39066a..d35c480 100644 --- a/EsApi.go +++ b/EsApi.go @@ -1,6 +1,7 @@ package esutil import ( + "basic.com/pubsub/protomsg.git" "encoding/json" "errors" "fmt" @@ -8,8 +9,6 @@ "strings" "sync" "time" - - "basic.com/pubsub/protomsg.git" ) var logPrint = func(i ...interface{}) { @@ -159,11 +158,144 @@ } +/**************************************customer analysis util start**************************************/ + +//鏍规嵁鏃堕棿鑼冨洿鑱氬悎鎵�鏈夊尯鍩熶汉淇℃伅锛岃繑鍥炲浐瀹氭潯鏁� +func GetFaceDataByTimeAnd(startTime string, total int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) { + var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" + var requestBody = `{ + "query": { + "bool": { + "filter": [ + { + "range": { + "picDate": { + "gte": "` + startTime + `" + } + } + }, + { + "term":{ + "targetInfo.targetType.raw": "FaceDetect" + } + } + ] + } + }, + "size": 0, + "aggs": { + "buckets_aggs": { + "composite": { + "sources": [ + { + "faceId": { + "terms": { + "field": "baseInfo.targetId" + } + } + }, + { + "areaId": { + "terms": { + "field": "targetInfo.areaId" + } + } + } + ], + "size": 10000000 + }, + "aggs": { + "top_attention_hits": { + "top_hits": { + "size": 1000000, + "sort": [ + { + "picDate": { + "order": "asc" + } + } + ], + "_source": { + "includes": [ + "baseInfo.targetId", + "targetInfo.picSmUrl", + "picDate" + ] + } + } + } + } + } + } +}` + buf, err := EsReq("POST", requestUrl, []byte(requestBody)) + if err != nil { + return nil, err + } + source, err := Sourcelist(buf) + if err != nil { + return nil, err + } + fmt.Println(source) + return resData, nil +} + +func GetFaceIdDeduplication(startTime string, endTime string, serverIp string, serverPort string, indexName string) (ids []string, err error) { + var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" + var requestBody = `{ + "query": { + "bool": { + "filter": [ + { + "range": { + "picDate": { + "gte": "` + startTime + `", + "lte": "` + endTime + `" + } + } + }, + { + "term": { + "targetInfo.targetType.raw": "FaceDetect" + } + } + ] + } + }, + "size": 0, + "aggs": { + "buckets_aggs": { + "composite": { + "sources": [ + { + "faceId": { + "terms": { + "field": "baseInfo.targetId" + } + } + } + ], + "size": 10000000 + } + } + } +}` + buf, err := EsReq("POST", requestUrl, []byte(requestBody)) + if err != nil { + return nil, err + } + fmt.Println(buf) + //ids, err := SourceDeduplication(buf) + + return ids,nil +} + +/**************************************customer analysis util end**************************************/ //鏍规嵁鎽勫儚鏈哄垪琛ㄥ拰鏃堕棿鏌ヨ浜哄憳娴忚杞ㄨ抗 -func GetPersonDataByCameraIdAndTime(cameraId []string, startTime string, endTime string, serverIp string, ServerPort string, indexName string) (map[string]interface{}, error) { +func GetPersonDataByCameraIdAndTime(cameraId []string, startTime string, endTime string, serverIp string, serverPort string, indexName string) (map[string]interface{}, error) { var filterArr []string - if cameraId != nil && len(cameraId) > 0{ + if cameraId != nil && len(cameraId) > 0 { esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1) filterArr = append(filterArr, `{ "terms": { @@ -186,7 +318,7 @@ }`) queryStr := strings.Join(filterArr, ",") - personUrl := "http://" + serverIp + ":" + ServerPort + "/" + indexName + "/_search" + personUrl := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" personBody := `{ "query": { "bool": { @@ -236,7 +368,7 @@ //鏍规嵁鏃堕棿鑼冨洿锛屾憚鍍忔満鍒楄〃锛屽垎缁勮仛鍚堜汉鑴稿垪琛�,杩斿洖鍒嗙粍鏁版嵁 func GetFaceDataBucketsByCameraIdAndTimeReturnByGrouped(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) { var filterArr []string - if cameraId != nil && len(cameraId) > 0{ + if cameraId != nil && len(cameraId) > 0 { esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1) filterArr = append(filterArr, `{ "terms": { @@ -244,7 +376,7 @@ } }`) } - if personId != nil &&len(personId) > 0{ + if personId != nil && len(personId) > 0 { esPersonId := strings.Replace(strings.Trim(fmt.Sprint(personId), "[]"), " ", "\",\"", -1) filterArr = append(filterArr, `{ "terms": { @@ -342,7 +474,7 @@ //鏍规嵁鏃堕棿鑼冨洿锛屾憚鍍忔満鍒楄〃锛屽垎缁勮仛鍚堜汉鑴稿垪琛� func GetFaceDataBucketsByCameraIdAndTime(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) { var filterArr []string - if cameraId != nil && len(cameraId) > 0{ + if cameraId != nil && len(cameraId) > 0 { esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1) filterArr = append(filterArr, `{ "terms": { @@ -350,7 +482,7 @@ } }`) } - if personId != nil &&len(personId) > 0{ + if personId != nil && len(personId) > 0 { esPersonId := strings.Replace(strings.Trim(fmt.Sprint(personId), "[]"), " ", "\",\"", -1) filterArr = append(filterArr, `{ "terms": { @@ -492,7 +624,7 @@ } middle, ok := out["updated"].(float64) if !ok { - logPrint("first updated change error!") + logPrint("first updated change error!", out) return errors.New("first updated change error!") } if middle == 1 { diff --git a/EsClient.go b/EsClient.go index 79c7c0d..8e08e76 100644 --- a/EsClient.go +++ b/EsClient.go @@ -476,6 +476,112 @@ return tmpinfos } +func FaceSourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) { + s := make(map[string]interface{}) + loc, err := time.LoadLocation("Asia/Shanghai") + if err != nil { + return nil, errors.New("鏃跺尯璁剧疆閿欒") + } + var info interface{} + json.Unmarshal(buf, &info) + out, ok := info.(map[string]interface{}) + if !ok { + return nil, errors.New("http response interface can not change map[string]interface{}") + } + middle, ok := out["aggregations"].(map[string]interface{}) + if !ok { + return nil, errors.New("first hits change error!") + } + bucketsAggs := middle["buckets_aggs"].(map[string]interface{}) + buckets := bucketsAggs["buckets"].([]interface{}) + if len(buckets) == 0 { + return nil, nil + } + allSource := make([]map[string]interface{}, 0) + for _, inf := range buckets { + hitsSources := make([]map[string]interface{}, 0) + topAttentionHits := inf.(map[string]interface{})["top_attention_hits"].(map[string]interface{}) + middleHits := topAttentionHits["hits"].(map[string]interface{}) + finalHits := middleHits["hits"].([]interface{}) + startTime := "" + indexLength := len(finalHits) + point := 0 + for _, in := range finalHits { + point = point+1 + tmpHitSource := make(map[string]interface{}) + tmpbuf, ok := in.(map[string]interface{}) + if !ok { + fmt.Println("change to source error!") + continue + } + source, ok := tmpbuf["_source"].(map[string]interface{}) + if !ok { + fmt.Println("change _source error!") + continue + } + baseInfo := source["baseInfo"].([]interface{})[0].(map[string]interface{}) + targetInfo := source["targetInfo"].([]interface{})[0].(map[string]interface{}) + tmpTime := source["picDate"].(string) + mTime, err := time.ParseInLocation("2006-01-02 15:04:05", tmpTime, loc) + if err != nil { + return nil, errors.New("鏃堕棿瑙f瀽閿欒") + } + + sTime := tmpTime + eTime := mTime.Add(time.Second*1).Format("2006-01-02 15:04:05") + stayTime := 1.0 + if startTime != "" && point < indexLength{ + sinTime, _ := time.ParseInLocation("2006-01-02 15:04:05", startTime, loc) + passTime := math.Abs(mTime.Sub(sinTime).Seconds()) + hitsSources[len(hitsSources)-1]["stayTime"] = stayTime + //fmt.Println("passTime: ", passTime) + if passTime <= thresholdTime || point == indexLength{ + startTime = tmpTime + hitsSources[len(hitsSources)-1]["endTime"] = tmpTime + if point == indexLength{ + hitStartTime := hitsSources[len(hitsSources)-1]["startTime"].(string) + realStartTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitStartTime, loc) + stayTime = math.Abs(mTime.Sub(realStartTime).Seconds()) + hitsSources[len(hitsSources)-1]["stayTime"] = stayTime + } + continue + } else { + hitStartTime := hitsSources[len(hitsSources)-1]["startTime"].(string) + hitEndTime := hitsSources[len(hitsSources)-1]["endTime"].(string) + realStartTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitStartTime, loc) + realEndTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitEndTime, loc) + stayTime = math.Abs(realEndTime.Sub(realStartTime).Seconds()) + if sinTime.Sub(mTime).Seconds() == 0 { + sinTime.Add(time.Second * 1) + sinTime.Format("2006-01-02 15:04:05") + hitsSources[len(hitsSources)-1]["endTime"] = tmpTime + stayTime = 1 + } + hitsSources[len(hitsSources)-1]["stayTime"] = stayTime + startTime = "" + continue + //fmt.Println(hitsSources[len(hitsSources)-1]) + } + } + //fmt.Println("========================================================") + startTime = tmpTime + tmpHitSource["personId"] = baseInfo["targetId"].(string) + tmpHitSource["startTime"] = sTime + tmpHitSource["startFacePicUrl"] = targetInfo["picSmUrl"].(string) + tmpHitSource["endTime"] = eTime + tmpHitSource["stayTime"] = stayTime + hitsSources = append(hitsSources, tmpHitSource) + } + allSource = append(allSource, hitsSources...) + } + count := len(allSource) + //fmt.Println(count) + s["count"] = count + s["allSource"] = allSource + s["queryUseTime"] = queryUseTime + return s, nil +} + func SourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) { s := make(map[string]interface{}) loc, err := time.LoadLocation("Asia/Shanghai") @@ -592,10 +698,6 @@ hitsSources = append(hitsSources, tmpHitSource) } allSource = append(allSource, hitsSources...) - // tmpSources["groupKey"] = groupKey - // tmpSources["doc_count"] = docCount - // tmpSources["hits_sources"] = hitsSources - // sources = append(sources, tmpSources) } count := len(allSource) //fmt.Println(count) -- Gitblit v1.8.0