From 353829e0039ebebdf3502cd198248edf7e94d8d4 Mon Sep 17 00:00:00 2001
From: sunty <1172534965@qq.com>
Date: 星期五, 28 八月 2020 19:17:43 +0800
Subject: [PATCH] add ids faceinfo
---
EsClient.go | 46 ++++++++++++++++------
EsApi.go | 58 +++++++++++++++++++++++-----
2 files changed, 80 insertions(+), 24 deletions(-)
diff --git a/EsApi.go b/EsApi.go
index d35c480..4849041 100644
--- a/EsApi.go
+++ b/EsApi.go
@@ -5,6 +5,7 @@
"encoding/json"
"errors"
"fmt"
+ "sort"
"strconv"
"strings"
"sync"
@@ -159,9 +160,27 @@
}
/**************************************customer analysis util start**************************************/
+/*******************sort []map util*******************/
+type MapsSort struct {
+ Key string
+ MapList []map[string]interface{}
+}
+func (m *MapsSort) Len() int {
+ return len(m.MapList)
+}
+
+func (m *MapsSort) Less(i, j int) bool {
+ return m.MapList[i][m.Key].(string) > m.MapList[j][m.Key].(string)
+}
+
+func (m *MapsSort) Swap(i, j int) {
+ m.MapList[i], m.MapList[j] = m.MapList[j], m.MapList[i]
+}
+
+/*******************sort []map util*******************/
//鏍规嵁鏃堕棿鑼冨洿鑱氬悎鎵�鏈夊尯鍩熶汉淇℃伅锛岃繑鍥炲浐瀹氭潯鏁�
-func GetFaceDataByTimeAnd(startTime string, total int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) {
+func GetFaceDataByTimeAnd(startTime string, total int, thresholdTime int, thresholdStayTime int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) {
var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
var requestBody = `{
"query": {
@@ -219,6 +238,7 @@
"includes": [
"baseInfo.targetId",
"targetInfo.picSmUrl",
+ "targetInfo.areaId",
"picDate"
]
}
@@ -232,12 +252,26 @@
if err != nil {
return nil, err
}
- source, err := Sourcelist(buf)
+ source, err := FaceSourceAggregations(buf, thresholdTime, thresholdStayTime)
if err != nil {
return nil, err
}
- fmt.Println(source)
- return resData, nil
+ fmt.Println(len(source))
+ faceSource := make([]map[string]interface{}, 0)
+ for index, info := range source {
+ if int(info["stayTime"].(float64)) > thresholdStayTime {
+ faceSource = append(faceSource, source[index])
+ }
+ }
+ //fmt.Println(len(source))
+ if len(faceSource) > total {
+ mapsSort := MapsSort{}
+ mapsSort.Key = "endTime"
+ mapsSort.MapList = faceSource
+ sort.Sort(&mapsSort)
+ return mapsSort.MapList[:total], nil
+ }
+ return faceSource, nil
}
func GetFaceIdDeduplication(startTime string, endTime string, serverIp string, serverPort string, indexName string) (ids []string, err error) {
@@ -280,14 +314,17 @@
}
}
}`
+ //fmt.Println(requestUrl)
+ //fmt.Println(requestBody)
buf, err := EsReq("POST", requestUrl, []byte(requestBody))
if err != nil {
return nil, err
}
- fmt.Println(buf)
- //ids, err := SourceDeduplication(buf)
-
- return ids,nil
+ ids, err1 := SourceDeduplication(buf)
+ if err1 != nil {
+ return nil, err1
+ }
+ return ids, nil
}
/**************************************customer analysis util end**************************************/
@@ -580,9 +617,8 @@
return err
}
picMaxUrls := tRes[0].PicMaxUrl
- sourceStr := `
- "lang":"painless",
- "inline": "ctx._source.picMaxUrl.add('` + picUrl + `');ctx._source.updateTime='` + updateTime + `'"
+ sourceStr := `
+ "source": "ctx._source.picMaxUrl.add('` + picUrl + `');ctx._source.updateTime='` + updateTime + `'"
`
if len(picMaxUrls) >= 2 {
sourceStr = `"source": "ctx._source.picMaxUrl[1]='` + picUrl + `';ctx._source.updateTime='` + updateTime + `'"`
diff --git a/EsClient.go b/EsClient.go
index 8e08e76..a24b0e2 100644
--- a/EsClient.go
+++ b/EsClient.go
@@ -476,8 +476,7 @@
return tmpinfos
}
-func FaceSourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) {
- s := make(map[string]interface{})
+func FaceSourceAggregations(buf []byte, thresholdTime int,thresholdStayTime int) (sources []map[string]interface{}, err error) {
loc, err := time.LoadLocation("Asia/Shanghai")
if err != nil {
return nil, errors.New("鏃跺尯璁剧疆閿欒")
@@ -509,12 +508,12 @@
for _, in := range finalHits {
point = point+1
tmpHitSource := make(map[string]interface{})
- tmpbuf, ok := in.(map[string]interface{})
+ tmpBuf, ok := in.(map[string]interface{})
if !ok {
fmt.Println("change to source error!")
continue
}
- source, ok := tmpbuf["_source"].(map[string]interface{})
+ source, ok := tmpBuf["_source"].(map[string]interface{})
if !ok {
fmt.Println("change _source error!")
continue
@@ -535,7 +534,7 @@
passTime := math.Abs(mTime.Sub(sinTime).Seconds())
hitsSources[len(hitsSources)-1]["stayTime"] = stayTime
//fmt.Println("passTime: ", passTime)
- if passTime <= thresholdTime || point == indexLength{
+ if int(passTime) <= thresholdTime || point == indexLength{
startTime = tmpTime
hitsSources[len(hitsSources)-1]["endTime"] = tmpTime
if point == indexLength{
@@ -543,6 +542,7 @@
realStartTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitStartTime, loc)
stayTime = math.Abs(mTime.Sub(realStartTime).Seconds())
hitsSources[len(hitsSources)-1]["stayTime"] = stayTime
+ startTime = ""
}
continue
} else {
@@ -565,7 +565,11 @@
}
//fmt.Println("========================================================")
startTime = tmpTime
- tmpHitSource["personId"] = baseInfo["targetId"].(string)
+ tmpHitSource["faceId"] = baseInfo["targetId"].(string)
+ if targetInfo["areaId"] == nil {
+ continue
+ }
+ tmpHitSource["areaId"] = targetInfo["areaId"].(string)
tmpHitSource["startTime"] = sTime
tmpHitSource["startFacePicUrl"] = targetInfo["picSmUrl"].(string)
tmpHitSource["endTime"] = eTime
@@ -574,14 +578,30 @@
}
allSource = append(allSource, hitsSources...)
}
- count := len(allSource)
- //fmt.Println(count)
- s["count"] = count
- s["allSource"] = allSource
- s["queryUseTime"] = queryUseTime
- return s, nil
+ return allSource, nil
}
-
+func SourceDeduplication(buf [] byte) ([]string,error) {
+ var info interface{}
+ json.Unmarshal(buf, &info)
+ out, ok := info.(map[string]interface{})
+ if !ok {
+ return nil, errors.New("http response interface can not change map[string]interface{}")
+ }
+ middle, ok := out["aggregations"].(map[string]interface{})
+ if !ok {
+ return nil, errors.New("first hits change error!")
+ }
+ bucketsAggs := middle["buckets_aggs"].(map[string]interface{})
+ buckets := bucketsAggs["buckets"].([]interface{})
+ if len(buckets) == 0 {
+ return nil, nil
+ }
+ faceId := make([]string,0)
+ for _, in := range buckets {
+ faceId = append(faceId, in.(map[string]interface{})["key"].(map[string]interface{})["faceId"].(string))
+ }
+ return faceId,nil
+}
func SourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) {
s := make(map[string]interface{})
loc, err := time.LoadLocation("Asia/Shanghai")
--
Gitblit v1.8.0