From c6ae506a28e9ce6463822bfea5369856acf980e0 Mon Sep 17 00:00:00 2001
From: sunty <1172534965@qq.com>
Date: 星期四, 14 一月 2021 18:19:37 +0800
Subject: [PATCH] add cameraIds to AggregateTaskList
---
EsClient.go | 252 ++++++++++++++++++++++++++++++++-----------------
1 files changed, 163 insertions(+), 89 deletions(-)
diff --git a/EsClient.go b/EsClient.go
index 8e08e76..d8c9db5 100644
--- a/EsClient.go
+++ b/EsClient.go
@@ -3,60 +3,18 @@
import (
"basic.com/pubsub/protomsg.git"
"bytes"
+ "encoding/base64"
"encoding/json"
"errors"
"fmt"
- "io"
+ "github.com/spf13/viper"
"io/ioutil"
+ "log"
"math"
"net/http"
- "strings"
"time"
)
-func GetEsDataReq(url string, parama string, isSource bool) map[string]interface{} {
- //fmt.Println("es 鏌ヨ璇锋眰璺緞" + url) // 閰嶇疆淇℃伅 鑾峰彇
- var dat map[string]interface{}
- req, err := http.NewRequest("POST", url, strings.NewReader(parama))
- req.Header.Add("Content-Type", "application/json")
- timeout := time.Duration(10 * time.Second) //瓒呮椂鏃堕棿50ms
- client := &http.Client{Timeout: timeout}
- resp, err := client.Do(req)
- if err != nil {
- fmt.Println(err)
- return dat
- }
- defer resp.Body.Close()
- dec := json.NewDecoder(resp.Body)
- if err := dec.Decode(&dat); err == io.EOF {
- fmt.Println(err.Error())
- return dat
- } else if err != nil {
- fmt.Println(err.Error())
- return dat
- }
- // 鏄惁闇�瑕� 瑙f瀽 es 杩斿洖鐨� source
- if isSource {
- dat = dat["hits"].(map[string]interface{})
- var data = make(map[string]interface{}, 2)
- data["total"] = dat["total"]
- sources := []interface{}{}
- for _, value := range dat["hits"].([]interface{}) {
- source := value.(map[string]interface{})["_source"].(map[string]interface{})
- //source["id"] = source["id"]
- /*sdkType := source["sdkType"]
- if sdkType != nil {
- sdk, _ := strconv.Atoi(sdkType.(string))
- source["sdkType"] = sdkTypeToValue(sdk)
- }*/
- sources = append(sources, source)
- }
- data["datalist"] = sources
- return data
- } else {
- return dat
- }
-}
func Parsesources(sources []map[string]interface{}) (multiInfos []*protomsg.MultiFeaCache) {
var ok bool
@@ -476,8 +434,7 @@
return tmpinfos
}
-func FaceSourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) {
- s := make(map[string]interface{})
+func FaceSourceAggregations(buf []byte, thresholdTime int, thresholdStayTime int) (sources []map[string]interface{}, err error) {
loc, err := time.LoadLocation("Asia/Shanghai")
if err != nil {
return nil, errors.New("鏃跺尯璁剧疆閿欒")
@@ -507,14 +464,14 @@
indexLength := len(finalHits)
point := 0
for _, in := range finalHits {
- point = point+1
+ point = point + 1
tmpHitSource := make(map[string]interface{})
- tmpbuf, ok := in.(map[string]interface{})
+ tmpBuf, ok := in.(map[string]interface{})
if !ok {
fmt.Println("change to source error!")
continue
}
- source, ok := tmpbuf["_source"].(map[string]interface{})
+ source, ok := tmpBuf["_source"].(map[string]interface{})
if !ok {
fmt.Println("change _source error!")
continue
@@ -522,27 +479,33 @@
baseInfo := source["baseInfo"].([]interface{})[0].(map[string]interface{})
targetInfo := source["targetInfo"].([]interface{})[0].(map[string]interface{})
tmpTime := source["picDate"].(string)
+ if len(tmpTime) > 19 {
+ tmpTime = tmpTime[:19]
+ }
mTime, err := time.ParseInLocation("2006-01-02 15:04:05", tmpTime, loc)
if err != nil {
return nil, errors.New("鏃堕棿瑙f瀽閿欒")
}
sTime := tmpTime
- eTime := mTime.Add(time.Second*1).Format("2006-01-02 15:04:05")
+ eTime := mTime.Add(time.Second * 1).Format("2006-01-02 15:04:05")
stayTime := 1.0
- if startTime != "" && point < indexLength{
+ if startTime != "" && point <= indexLength {
sinTime, _ := time.ParseInLocation("2006-01-02 15:04:05", startTime, loc)
passTime := math.Abs(mTime.Sub(sinTime).Seconds())
hitsSources[len(hitsSources)-1]["stayTime"] = stayTime
//fmt.Println("passTime: ", passTime)
- if passTime <= thresholdTime || point == indexLength{
- startTime = tmpTime
- hitsSources[len(hitsSources)-1]["endTime"] = tmpTime
- if point == indexLength{
+ if int(passTime) <= thresholdTime {
+ if point == indexLength {
hitStartTime := hitsSources[len(hitsSources)-1]["startTime"].(string)
realStartTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitStartTime, loc)
stayTime = math.Abs(mTime.Sub(realStartTime).Seconds())
hitsSources[len(hitsSources)-1]["stayTime"] = stayTime
+ hitsSources[len(hitsSources)-1]["endTime"] = tmpTime
+ startTime = ""
+ } else {
+ startTime = tmpTime
+ hitsSources[len(hitsSources)-1]["endTime"] = tmpTime
}
continue
} else {
@@ -555,31 +518,90 @@
sinTime.Add(time.Second * 1)
sinTime.Format("2006-01-02 15:04:05")
hitsSources[len(hitsSources)-1]["endTime"] = tmpTime
- stayTime = 1
+ stayTime = 1
+ } else if stayTime == 0 {
+ stayTime = 1
+ hitsSources[len(hitsSources)-1]["endTime"] = realEndTime.Add(time.Second * 1).Format("2006-01-02 15:04:05")
}
hitsSources[len(hitsSources)-1]["stayTime"] = stayTime
+ if point == indexLength {
+ stayTime = 1
+ }
startTime = ""
- continue
- //fmt.Println(hitsSources[len(hitsSources)-1])
}
}
//fmt.Println("========================================================")
startTime = tmpTime
- tmpHitSource["personId"] = baseInfo["targetId"].(string)
+ tmpHitSource["faceId"] = baseInfo["targetId"].(string)
+ if targetInfo["areaId"] == nil {
+ continue
+ }
+ tmpHitSource["areaId"] = targetInfo["areaId"].(string)
tmpHitSource["startTime"] = sTime
- tmpHitSource["startFacePicUrl"] = targetInfo["picSmUrl"].(string)
+ tmpHitSource["faceImg"] = targetInfo["picSmUrl"].(string)
tmpHitSource["endTime"] = eTime
tmpHitSource["stayTime"] = stayTime
hitsSources = append(hitsSources, tmpHitSource)
}
allSource = append(allSource, hitsSources...)
}
- count := len(allSource)
- //fmt.Println(count)
- s["count"] = count
- s["allSource"] = allSource
- s["queryUseTime"] = queryUseTime
- return s, nil
+ return allSource, nil
+}
+
+func SourceDeduplication(buf [] byte) ([]map[string]interface{}, error) {
+ var info interface{}
+ json.Unmarshal(buf, &info)
+ out, ok := info.(map[string]interface{})
+ if !ok {
+ return nil, errors.New("http response interface can not change map[string]interface{}")
+ }
+ middle, ok := out["aggregations"].(map[string]interface{})
+ if !ok {
+ return nil, errors.New("first hits change error!")
+ }
+ bucketsAggs := middle["buckets_aggs"].(map[string]interface{})
+ buckets := bucketsAggs["buckets"].([]interface{})
+ if len(buckets) == 0 {
+ return nil, nil
+ }
+ faceId := make([]map[string]interface{}, 0)
+ for _, in := range buckets {
+ tmpInfo := make(map[string]interface{})
+ topAttentionHits := in.(map[string]interface{})["top_attention_hits"].(map[string]interface{})
+ middleHits := topAttentionHits["hits"].(map[string]interface{})
+ finalHits := middleHits["hits"].([]interface{})
+ tmpInfo["faceId"] = in.(map[string]interface{})["key"].(map[string]interface{})["faceId"].(string)
+ tmpInfo["lastTime"] = finalHits[0].(map[string]interface{})["_source"].(map[string]interface{})["picDate"].(string)
+ faceId = append(faceId, tmpInfo)
+ }
+ return faceId, nil
+}
+
+//瑙f瀽鑱氬悎璁℃暟缁撴瀯
+func SourceStatistics(buf [] byte) ([]map[string]interface{}, error) {
+ var info interface{}
+ json.Unmarshal(buf, &info)
+ out, ok := info.(map[string]interface{})
+ if !ok {
+ return nil, errors.New("http response interface can not change map[string]interface{}")
+ }
+ middle, ok := out["aggregations"].(map[string]interface{})
+ if !ok {
+ return nil, errors.New("first hits change error!")
+ }
+ bucketsAggs := middle["buckets_aggs"].(map[string]interface{})
+ buckets := bucketsAggs["buckets"].([]interface{})
+ if len(buckets) == 0 {
+ return nil, nil
+ }
+ resultData := make([]map[string]interface{}, 0)
+ for _, pick := range buckets {
+ data := make(map[string]interface{}, 0)
+ data["areaId"] = pick.(map[string]interface{})["key"].(map[string]interface{})["areaId"].(string)
+ data["peopleNum"] = int(pick.(map[string]interface{})["doc_count"].(float64))
+ resultData = append(resultData, data)
+ }
+ return resultData, nil
}
func SourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) {
@@ -616,7 +638,7 @@
indexLength := len(finalHits)
point := 0
for _, in := range finalHits {
- point = point+1
+ point = point + 1
tmpHitSource := make(map[string]interface{})
tmpbuf, ok := in.(map[string]interface{})
if !ok {
@@ -631,27 +653,33 @@
baseInfo := source["baseInfo"].([]interface{})[0].(map[string]interface{})
targetInfo := source["targetInfo"].([]interface{})[0].(map[string]interface{})
tmpTime := source["picDate"].(string)
+ if len(tmpTime) > 19 {
+ tmpTime = tmpTime[:19]
+ }
mTime, err := time.ParseInLocation("2006-01-02 15:04:05", tmpTime, loc)
if err != nil {
return nil, errors.New("鏃堕棿瑙f瀽閿欒")
}
sTime := tmpTime
- eTime := mTime.Add(time.Second*1).Format("2006-01-02 15:04:05")
+ eTime := mTime.Add(time.Second * 1).Format("2006-01-02 15:04:05")
stayTime := 1.0
- if startTime != "" && point < indexLength{
+ if startTime != "" && point <= indexLength {
sinTime, _ := time.ParseInLocation("2006-01-02 15:04:05", startTime, loc)
passTime := math.Abs(mTime.Sub(sinTime).Seconds())
hitsSources[len(hitsSources)-1]["stayTime"] = stayTime
//fmt.Println("passTime: ", passTime)
- if passTime <= thresholdTime || point == indexLength{
- startTime = tmpTime
- hitsSources[len(hitsSources)-1]["endTime"] = tmpTime
- if point == indexLength{
+ if passTime <= thresholdTime {
+ if point == indexLength {
hitStartTime := hitsSources[len(hitsSources)-1]["startTime"].(string)
realStartTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitStartTime, loc)
stayTime = math.Abs(mTime.Sub(realStartTime).Seconds())
hitsSources[len(hitsSources)-1]["stayTime"] = stayTime
+ hitsSources[len(hitsSources)-1]["endTime"] = tmpTime
+ startTime = ""
+ } else {
+ startTime = tmpTime
+ hitsSources[len(hitsSources)-1]["endTime"] = tmpTime
}
continue
} else {
@@ -664,12 +692,16 @@
sinTime.Add(time.Second * 1)
sinTime.Format("2006-01-02 15:04:05")
hitsSources[len(hitsSources)-1]["endTime"] = tmpTime
- stayTime = 1
+ stayTime = 1
+ } else if stayTime == 0 {
+ stayTime = 1
+ hitsSources[len(hitsSources)-1]["endTime"] = realEndTime.Add(time.Second * 1).Format("2006-01-02 15:04:05")
}
hitsSources[len(hitsSources)-1]["stayTime"] = stayTime
+ if point == indexLength {
+ stayTime = 1
+ }
startTime = ""
- continue
- //fmt.Println(hitsSources[len(hitsSources)-1])
}
}
//fmt.Println("========================================================")
@@ -739,7 +771,7 @@
startTime := ""
//fmt.Println("finalHits: ",finalHits)
for _, in := range finalHits {
- point = point+1
+ point = point + 1
tmpHitSource := make(map[string]interface{})
tmpbuf, ok := in.(map[string]interface{})
if !ok {
@@ -754,27 +786,33 @@
baseInfo := source["baseInfo"].([]interface{})[0].(map[string]interface{})
targetInfo := source["targetInfo"].([]interface{})[0].(map[string]interface{})
tmpTime := source["picDate"].(string)
+ if len(tmpTime) > 19 {
+ tmpTime = tmpTime[:19]
+ }
mTime, err := time.ParseInLocation("2006-01-02 15:04:05", tmpTime, loc)
if err != nil {
return nil, errors.New("鏃堕棿瑙f瀽閿欒")
}
sTime := tmpTime
- eTime := mTime.Add(time.Second*1).Format("2006-01-02 15:04:05")
+ eTime := mTime.Add(time.Second * 1).Format("2006-01-02 15:04:05")
stayTime := 1.0
- if startTime != "" && point < indexLength{
+ if startTime != "" && point <= indexLength {
sinTime, _ := time.ParseInLocation("2006-01-02 15:04:05", startTime, loc)
passTime := math.Abs(mTime.Sub(sinTime).Seconds())
hitsSources[len(hitsSources)-1]["stayTime"] = stayTime
//fmt.Println("passTime: ", passTime)
- if passTime <= thresholdTime || point == indexLength{
- startTime = tmpTime
- hitsSources[len(hitsSources)-1]["endTime"] = tmpTime
- if point == indexLength{
+ if passTime <= thresholdTime {
+ if point == indexLength {
hitStartTime := hitsSources[len(hitsSources)-1]["startTime"].(string)
realStartTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitStartTime, loc)
stayTime = math.Abs(mTime.Sub(realStartTime).Seconds())
hitsSources[len(hitsSources)-1]["stayTime"] = stayTime
+ hitsSources[len(hitsSources)-1]["endTime"] = tmpTime
+ startTime = ""
+ } else {
+ startTime = tmpTime
+ hitsSources[len(hitsSources)-1]["endTime"] = tmpTime
}
continue
} else {
@@ -787,12 +825,16 @@
sinTime.Add(time.Second * 1)
sinTime.Format("2006-01-02 15:04:05")
hitsSources[len(hitsSources)-1]["endTime"] = tmpTime
- stayTime = 1
+ stayTime = 1
+ } else if stayTime == 0 {
+ stayTime = 1
+ hitsSources[len(hitsSources)-1]["endTime"] = realEndTime.Add(time.Second * 1).Format("2006-01-02 15:04:05")
}
hitsSources[len(hitsSources)-1]["stayTime"] = stayTime
+ if point == indexLength {
+ stayTime = 1
+ }
startTime = ""
- continue
- //fmt.Println(hitsSources[len(hitsSources)-1])
}
}
startTime = tmpTime
@@ -991,8 +1033,15 @@
if !ok {
return -1, errors.New("first total change error!")
}
- tmp := middle["total"].(float64)
- total = int(tmp)
+
+ tmp,b := middle["total"].(map[string]interface{})
+ if b != true {
+ v := middle["total"].(float64)
+ t := int(v)
+ return t,nil
+ }
+ value := tmp["value"].(float64)
+ total = int(value)
return total, nil
}
@@ -1004,6 +1053,7 @@
}
request, err := http.NewRequest(method, url, bytes.NewBuffer(parama))
request.Header.Set("Content-type", "application/json")
+ request.Header.Set("Authorization",Token)
if err != nil {
fmt.Println("build request fail !")
@@ -1039,3 +1089,27 @@
// fmt.Println(key, "is nil can not asign")
// }
//}
+
+type account struct {
+ Username string `mapstructure: "username"`
+ Userpassword string `mapstructure: "userpassword"`
+}
+
+
+var Account = &account{}
+
+var Token string
+
+func init() {
+ v := viper.New()
+ v.SetConfigType("yaml")
+ v.SetConfigName("pro")
+ v.AddConfigPath("/opt/vasystem/config/")
+ err := v.ReadInConfig()
+ if err != nil {
+ log.Fatal("err on parsing configuration file!",err)
+ }
+ v.UnmarshalKey("es.account",Account)
+
+ Token = "Basic "+base64.StdEncoding.EncodeToString([]byte(Account.Username+":"+Account.Userpassword))
+}
--
Gitblit v1.8.0