From d0e6e8c6ef16afbc276fc13dece6239476f8d4e3 Mon Sep 17 00:00:00 2001
From: sunty <1172534965@qq.com>
Date: 星期五, 28 八月 2020 16:46:49 +0800
Subject: [PATCH] add out
---
EsClient.go | 110 +++++++++++++++++++++
EsApi.go | 152 ++++++++++++++++++++++++++++--
2 files changed, 248 insertions(+), 14 deletions(-)
diff --git a/EsApi.go b/EsApi.go
index d39066a..d35c480 100644
--- a/EsApi.go
+++ b/EsApi.go
@@ -1,6 +1,7 @@
package esutil
import (
+ "basic.com/pubsub/protomsg.git"
"encoding/json"
"errors"
"fmt"
@@ -8,8 +9,6 @@
"strings"
"sync"
"time"
-
- "basic.com/pubsub/protomsg.git"
)
var logPrint = func(i ...interface{}) {
@@ -159,11 +158,144 @@
}
+/**************************************customer analysis util start**************************************/
+
+//鏍规嵁鏃堕棿鑼冨洿鑱氬悎鎵�鏈夊尯鍩熶汉淇℃伅锛岃繑鍥炲浐瀹氭潯鏁�
+func GetFaceDataByTimeAnd(startTime string, total int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) {
+ var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
+ var requestBody = `{
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "range": {
+ "picDate": {
+ "gte": "` + startTime + `"
+ }
+ }
+ },
+ {
+ "term":{
+ "targetInfo.targetType.raw": "FaceDetect"
+ }
+ }
+ ]
+ }
+ },
+ "size": 0,
+ "aggs": {
+ "buckets_aggs": {
+ "composite": {
+ "sources": [
+ {
+ "faceId": {
+ "terms": {
+ "field": "baseInfo.targetId"
+ }
+ }
+ },
+ {
+ "areaId": {
+ "terms": {
+ "field": "targetInfo.areaId"
+ }
+ }
+ }
+ ],
+ "size": 10000000
+ },
+ "aggs": {
+ "top_attention_hits": {
+ "top_hits": {
+ "size": 1000000,
+ "sort": [
+ {
+ "picDate": {
+ "order": "asc"
+ }
+ }
+ ],
+ "_source": {
+ "includes": [
+ "baseInfo.targetId",
+ "targetInfo.picSmUrl",
+ "picDate"
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+}`
+ buf, err := EsReq("POST", requestUrl, []byte(requestBody))
+ if err != nil {
+ return nil, err
+ }
+ source, err := Sourcelist(buf)
+ if err != nil {
+ return nil, err
+ }
+ fmt.Println(source)
+ return resData, nil
+}
+
+func GetFaceIdDeduplication(startTime string, endTime string, serverIp string, serverPort string, indexName string) (ids []string, err error) {
+ var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
+ var requestBody = `{
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "range": {
+ "picDate": {
+ "gte": "` + startTime + `",
+ "lte": "` + endTime + `"
+ }
+ }
+ },
+ {
+ "term": {
+ "targetInfo.targetType.raw": "FaceDetect"
+ }
+ }
+ ]
+ }
+ },
+ "size": 0,
+ "aggs": {
+ "buckets_aggs": {
+ "composite": {
+ "sources": [
+ {
+ "faceId": {
+ "terms": {
+ "field": "baseInfo.targetId"
+ }
+ }
+ }
+ ],
+ "size": 10000000
+ }
+ }
+ }
+}`
+ buf, err := EsReq("POST", requestUrl, []byte(requestBody))
+ if err != nil {
+ return nil, err
+ }
+ fmt.Println(buf)
+ //ids, err := SourceDeduplication(buf)
+
+ return ids,nil
+}
+
+/**************************************customer analysis util end**************************************/
//鏍规嵁鎽勫儚鏈哄垪琛ㄥ拰鏃堕棿鏌ヨ浜哄憳娴忚杞ㄨ抗
-func GetPersonDataByCameraIdAndTime(cameraId []string, startTime string, endTime string, serverIp string, ServerPort string, indexName string) (map[string]interface{}, error) {
+func GetPersonDataByCameraIdAndTime(cameraId []string, startTime string, endTime string, serverIp string, serverPort string, indexName string) (map[string]interface{}, error) {
var filterArr []string
- if cameraId != nil && len(cameraId) > 0{
+ if cameraId != nil && len(cameraId) > 0 {
esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1)
filterArr = append(filterArr, `{
"terms": {
@@ -186,7 +318,7 @@
}`)
queryStr := strings.Join(filterArr, ",")
- personUrl := "http://" + serverIp + ":" + ServerPort + "/" + indexName + "/_search"
+ personUrl := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
personBody := `{
"query": {
"bool": {
@@ -236,7 +368,7 @@
//鏍规嵁鏃堕棿鑼冨洿锛屾憚鍍忔満鍒楄〃锛屽垎缁勮仛鍚堜汉鑴稿垪琛�,杩斿洖鍒嗙粍鏁版嵁
func GetFaceDataBucketsByCameraIdAndTimeReturnByGrouped(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) {
var filterArr []string
- if cameraId != nil && len(cameraId) > 0{
+ if cameraId != nil && len(cameraId) > 0 {
esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1)
filterArr = append(filterArr, `{
"terms": {
@@ -244,7 +376,7 @@
}
}`)
}
- if personId != nil &&len(personId) > 0{
+ if personId != nil && len(personId) > 0 {
esPersonId := strings.Replace(strings.Trim(fmt.Sprint(personId), "[]"), " ", "\",\"", -1)
filterArr = append(filterArr, `{
"terms": {
@@ -342,7 +474,7 @@
//鏍规嵁鏃堕棿鑼冨洿锛屾憚鍍忔満鍒楄〃锛屽垎缁勮仛鍚堜汉鑴稿垪琛�
func GetFaceDataBucketsByCameraIdAndTime(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) {
var filterArr []string
- if cameraId != nil && len(cameraId) > 0{
+ if cameraId != nil && len(cameraId) > 0 {
esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1)
filterArr = append(filterArr, `{
"terms": {
@@ -350,7 +482,7 @@
}
}`)
}
- if personId != nil &&len(personId) > 0{
+ if personId != nil && len(personId) > 0 {
esPersonId := strings.Replace(strings.Trim(fmt.Sprint(personId), "[]"), " ", "\",\"", -1)
filterArr = append(filterArr, `{
"terms": {
@@ -492,7 +624,7 @@
}
middle, ok := out["updated"].(float64)
if !ok {
- logPrint("first updated change error!")
+ logPrint("first updated change error!", out)
return errors.New("first updated change error!")
}
if middle == 1 {
diff --git a/EsClient.go b/EsClient.go
index 79c7c0d..8e08e76 100644
--- a/EsClient.go
+++ b/EsClient.go
@@ -476,6 +476,112 @@
return tmpinfos
}
+func FaceSourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) {
+ s := make(map[string]interface{})
+ loc, err := time.LoadLocation("Asia/Shanghai")
+ if err != nil {
+ return nil, errors.New("鏃跺尯璁剧疆閿欒")
+ }
+ var info interface{}
+ json.Unmarshal(buf, &info)
+ out, ok := info.(map[string]interface{})
+ if !ok {
+ return nil, errors.New("http response interface can not change map[string]interface{}")
+ }
+ middle, ok := out["aggregations"].(map[string]interface{})
+ if !ok {
+ return nil, errors.New("first hits change error!")
+ }
+ bucketsAggs := middle["buckets_aggs"].(map[string]interface{})
+ buckets := bucketsAggs["buckets"].([]interface{})
+ if len(buckets) == 0 {
+ return nil, nil
+ }
+ allSource := make([]map[string]interface{}, 0)
+ for _, inf := range buckets {
+ hitsSources := make([]map[string]interface{}, 0)
+ topAttentionHits := inf.(map[string]interface{})["top_attention_hits"].(map[string]interface{})
+ middleHits := topAttentionHits["hits"].(map[string]interface{})
+ finalHits := middleHits["hits"].([]interface{})
+ startTime := ""
+ indexLength := len(finalHits)
+ point := 0
+ for _, in := range finalHits {
+ point = point+1
+ tmpHitSource := make(map[string]interface{})
+ tmpbuf, ok := in.(map[string]interface{})
+ if !ok {
+ fmt.Println("change to source error!")
+ continue
+ }
+ source, ok := tmpbuf["_source"].(map[string]interface{})
+ if !ok {
+ fmt.Println("change _source error!")
+ continue
+ }
+ baseInfo := source["baseInfo"].([]interface{})[0].(map[string]interface{})
+ targetInfo := source["targetInfo"].([]interface{})[0].(map[string]interface{})
+ tmpTime := source["picDate"].(string)
+ mTime, err := time.ParseInLocation("2006-01-02 15:04:05", tmpTime, loc)
+ if err != nil {
+ return nil, errors.New("鏃堕棿瑙f瀽閿欒")
+ }
+
+ sTime := tmpTime
+ eTime := mTime.Add(time.Second*1).Format("2006-01-02 15:04:05")
+ stayTime := 1.0
+ if startTime != "" && point < indexLength{
+ sinTime, _ := time.ParseInLocation("2006-01-02 15:04:05", startTime, loc)
+ passTime := math.Abs(mTime.Sub(sinTime).Seconds())
+ hitsSources[len(hitsSources)-1]["stayTime"] = stayTime
+ //fmt.Println("passTime: ", passTime)
+ if passTime <= thresholdTime || point == indexLength{
+ startTime = tmpTime
+ hitsSources[len(hitsSources)-1]["endTime"] = tmpTime
+ if point == indexLength{
+ hitStartTime := hitsSources[len(hitsSources)-1]["startTime"].(string)
+ realStartTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitStartTime, loc)
+ stayTime = math.Abs(mTime.Sub(realStartTime).Seconds())
+ hitsSources[len(hitsSources)-1]["stayTime"] = stayTime
+ }
+ continue
+ } else {
+ hitStartTime := hitsSources[len(hitsSources)-1]["startTime"].(string)
+ hitEndTime := hitsSources[len(hitsSources)-1]["endTime"].(string)
+ realStartTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitStartTime, loc)
+ realEndTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitEndTime, loc)
+ stayTime = math.Abs(realEndTime.Sub(realStartTime).Seconds())
+ if sinTime.Sub(mTime).Seconds() == 0 {
+ sinTime.Add(time.Second * 1)
+ sinTime.Format("2006-01-02 15:04:05")
+ hitsSources[len(hitsSources)-1]["endTime"] = tmpTime
+ stayTime = 1
+ }
+ hitsSources[len(hitsSources)-1]["stayTime"] = stayTime
+ startTime = ""
+ continue
+ //fmt.Println(hitsSources[len(hitsSources)-1])
+ }
+ }
+ //fmt.Println("========================================================")
+ startTime = tmpTime
+ tmpHitSource["personId"] = baseInfo["targetId"].(string)
+ tmpHitSource["startTime"] = sTime
+ tmpHitSource["startFacePicUrl"] = targetInfo["picSmUrl"].(string)
+ tmpHitSource["endTime"] = eTime
+ tmpHitSource["stayTime"] = stayTime
+ hitsSources = append(hitsSources, tmpHitSource)
+ }
+ allSource = append(allSource, hitsSources...)
+ }
+ count := len(allSource)
+ //fmt.Println(count)
+ s["count"] = count
+ s["allSource"] = allSource
+ s["queryUseTime"] = queryUseTime
+ return s, nil
+}
+
func SourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) {
s := make(map[string]interface{})
loc, err := time.LoadLocation("Asia/Shanghai")
@@ -592,10 +698,6 @@
hitsSources = append(hitsSources, tmpHitSource)
}
allSource = append(allSource, hitsSources...)
- // tmpSources["groupKey"] = groupKey
- // tmpSources["doc_count"] = docCount
- // tmpSources["hits_sources"] = hitsSources
- // sources = append(sources, tmpSources)
}
count := len(allSource)
//fmt.Println(count)
--
Gitblit v1.8.0