From 353829e0039ebebdf3502cd198248edf7e94d8d4 Mon Sep 17 00:00:00 2001 From: sunty <1172534965@qq.com> Date: 星期五, 28 八月 2020 19:17:43 +0800 Subject: [PATCH] add ids faceinfo --- EsClient.go | 46 ++++++++++++++++------ EsApi.go | 58 +++++++++++++++++++++++----- 2 files changed, 80 insertions(+), 24 deletions(-) diff --git a/EsApi.go b/EsApi.go index d35c480..4849041 100644 --- a/EsApi.go +++ b/EsApi.go @@ -5,6 +5,7 @@ "encoding/json" "errors" "fmt" + "sort" "strconv" "strings" "sync" @@ -159,9 +160,27 @@ } /**************************************customer analysis util start**************************************/ +/*******************sort []map util*******************/ +type MapsSort struct { + Key string + MapList []map[string]interface{} +} +func (m *MapsSort) Len() int { + return len(m.MapList) +} + +func (m *MapsSort) Less(i, j int) bool { + return m.MapList[i][m.Key].(string) > m.MapList[j][m.Key].(string) +} + +func (m *MapsSort) Swap(i, j int) { + m.MapList[i], m.MapList[j] = m.MapList[j], m.MapList[i] +} + +/*******************sort []map util*******************/ //鏍规嵁鏃堕棿鑼冨洿鑱氬悎鎵�鏈夊尯鍩熶汉淇℃伅锛岃繑鍥炲浐瀹氭潯鏁� -func GetFaceDataByTimeAnd(startTime string, total int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) { +func GetFaceDataByTimeAnd(startTime string, total int, thresholdTime int, thresholdStayTime int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) { var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" var requestBody = `{ "query": { @@ -219,6 +238,7 @@ "includes": [ "baseInfo.targetId", "targetInfo.picSmUrl", + "targetInfo.areaId", "picDate" ] } @@ -232,12 +252,26 @@ if err != nil { return nil, err } - source, err := Sourcelist(buf) + source, err := FaceSourceAggregations(buf, thresholdTime, thresholdStayTime) if err != nil { return nil, err } - fmt.Println(source) - return resData, nil + fmt.Println(len(source)) + faceSource := make([]map[string]interface{}, 0) + for index, info := range source { + if int(info["stayTime"].(float64)) > thresholdStayTime { + faceSource = append(faceSource, source[index]) + } + } + //fmt.Println(len(source)) + if len(faceSource) > total { + mapsSort := MapsSort{} + mapsSort.Key = "endTime" + mapsSort.MapList = faceSource + sort.Sort(&mapsSort) + return mapsSort.MapList[:total], nil + } + return faceSource, nil } func GetFaceIdDeduplication(startTime string, endTime string, serverIp string, serverPort string, indexName string) (ids []string, err error) { @@ -280,14 +314,17 @@ } } }` + //fmt.Println(requestUrl) + //fmt.Println(requestBody) buf, err := EsReq("POST", requestUrl, []byte(requestBody)) if err != nil { return nil, err } - fmt.Println(buf) - //ids, err := SourceDeduplication(buf) - - return ids,nil + ids, err1 := SourceDeduplication(buf) + if err1 != nil { + return nil, err1 + } + return ids, nil } /**************************************customer analysis util end**************************************/ @@ -580,9 +617,8 @@ return err } picMaxUrls := tRes[0].PicMaxUrl - sourceStr := ` - "lang":"painless", - "inline": "ctx._source.picMaxUrl.add('` + picUrl + `');ctx._source.updateTime='` + updateTime + `'" + sourceStr := ` + "source": "ctx._source.picMaxUrl.add('` + picUrl + `');ctx._source.updateTime='` + updateTime + `'" ` if len(picMaxUrls) >= 2 { sourceStr = `"source": "ctx._source.picMaxUrl[1]='` + picUrl + `';ctx._source.updateTime='` + updateTime + `'"` diff --git a/EsClient.go b/EsClient.go index 8e08e76..a24b0e2 100644 --- a/EsClient.go +++ b/EsClient.go @@ -476,8 +476,7 @@ return tmpinfos } -func FaceSourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) { - s := make(map[string]interface{}) +func FaceSourceAggregations(buf []byte, thresholdTime int,thresholdStayTime int) (sources []map[string]interface{}, err error) { loc, err := time.LoadLocation("Asia/Shanghai") if err != nil { return nil, errors.New("鏃跺尯璁剧疆閿欒") @@ -509,12 +508,12 @@ for _, in := range finalHits { point = point+1 tmpHitSource := make(map[string]interface{}) - tmpbuf, ok := in.(map[string]interface{}) + tmpBuf, ok := in.(map[string]interface{}) if !ok { fmt.Println("change to source error!") continue } - source, ok := tmpbuf["_source"].(map[string]interface{}) + source, ok := tmpBuf["_source"].(map[string]interface{}) if !ok { fmt.Println("change _source error!") continue @@ -535,7 +534,7 @@ passTime := math.Abs(mTime.Sub(sinTime).Seconds()) hitsSources[len(hitsSources)-1]["stayTime"] = stayTime //fmt.Println("passTime: ", passTime) - if passTime <= thresholdTime || point == indexLength{ + if int(passTime) <= thresholdTime || point == indexLength{ startTime = tmpTime hitsSources[len(hitsSources)-1]["endTime"] = tmpTime if point == indexLength{ @@ -543,6 +542,7 @@ realStartTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitStartTime, loc) stayTime = math.Abs(mTime.Sub(realStartTime).Seconds()) hitsSources[len(hitsSources)-1]["stayTime"] = stayTime + startTime = "" } continue } else { @@ -565,7 +565,11 @@ } //fmt.Println("========================================================") startTime = tmpTime - tmpHitSource["personId"] = baseInfo["targetId"].(string) + tmpHitSource["faceId"] = baseInfo["targetId"].(string) + if targetInfo["areaId"] == nil { + continue + } + tmpHitSource["areaId"] = targetInfo["areaId"].(string) tmpHitSource["startTime"] = sTime tmpHitSource["startFacePicUrl"] = targetInfo["picSmUrl"].(string) tmpHitSource["endTime"] = eTime @@ -574,14 +578,30 @@ } allSource = append(allSource, hitsSources...) } - count := len(allSource) - //fmt.Println(count) - s["count"] = count - s["allSource"] = allSource - s["queryUseTime"] = queryUseTime - return s, nil + return allSource, nil } - +func SourceDeduplication(buf [] byte) ([]string,error) { + var info interface{} + json.Unmarshal(buf, &info) + out, ok := info.(map[string]interface{}) + if !ok { + return nil, errors.New("http response interface can not change map[string]interface{}") + } + middle, ok := out["aggregations"].(map[string]interface{}) + if !ok { + return nil, errors.New("first hits change error!") + } + bucketsAggs := middle["buckets_aggs"].(map[string]interface{}) + buckets := bucketsAggs["buckets"].([]interface{}) + if len(buckets) == 0 { + return nil, nil + } + faceId := make([]string,0) + for _, in := range buckets { + faceId = append(faceId, in.(map[string]interface{})["key"].(map[string]interface{})["faceId"].(string)) + } + return faceId,nil +} func SourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) { s := make(map[string]interface{}) loc, err := time.LoadLocation("Asia/Shanghai") -- Gitblit v1.8.0