From adbcd8f2d5407cc5351a9fee39f13596f5b6c2c0 Mon Sep 17 00:00:00 2001 From: sunty <1172534965@qq.com> Date: 星期二, 01 九月 2020 14:09:41 +0800 Subject: [PATCH] add endTime GetFaceDataByTimeAndTotal --- EsApi.go | 686 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 files changed, 674 insertions(+), 12 deletions(-) diff --git a/EsApi.go b/EsApi.go index cc24dcc..73ee338 100644 --- a/EsApi.go +++ b/EsApi.go @@ -1,15 +1,15 @@ package esutil import ( + "basic.com/pubsub/protomsg.git" "encoding/json" "errors" "fmt" + "sort" "strconv" "strings" "sync" "time" - - "basic.com/pubsub/protomsg.git" ) var logPrint = func(i ...interface{}) { @@ -159,6 +159,661 @@ } +/**************************************customer analysis util start**************************************/ +/*******************sort []map util*******************/ +type MapsSort struct { + Key string + MapList []map[string]interface{} +} + +func (m *MapsSort) Len() int { + return len(m.MapList) +} + +func (m *MapsSort) Less(i, j int) bool { + return m.MapList[i][m.Key].(string) > m.MapList[j][m.Key].(string) +} + +func (m *MapsSort) Swap(i, j int) { + m.MapList[i], m.MapList[j] = m.MapList[j], m.MapList[i] +} + +/*******************sort []map util*******************/ +//鏍规嵁鏃堕棿鑼冨洿鑱氬悎鎵�鏈夊尯鍩熶汉淇℃伅锛岃繑鍥炲浐瀹氭潯鏁� +func GetFaceDataByTimeAndTotal(startTime string, endTime string, total int, thresholdTime int, thresholdStayTime int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) { + var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" + var requestBody = `{ + "query": { + "bool": { + "filter": [ + { + "range": { + "picDate": { + "gte": "` + startTime + `", + "lte": "` + endTime + `" + } + } + }, + { + "term":{ + "targetInfo.targetType.raw": "FaceDetect" + } + } + ] + } + }, + "size": 0, + "aggs": { + "buckets_aggs": { + "composite": { + "sources": [ + { + "faceId": { + "terms": { + "field": "baseInfo.targetId" + } + } + }, + { + "areaId": { + "terms": { + "field": "targetInfo.areaId" + } + } + } + ], + "size": 10000000 + }, + "aggs": { + "top_attention_hits": { + "top_hits": { + "size": 1000000, + "sort": [ + { + "picDate": { + "order": "asc" + } + } + ], + "_source": { + "includes": [ + "baseInfo.targetId", + "targetInfo.picSmUrl", + "targetInfo.areaId", + "picDate" + ] + } + } + } + } + } + } +}` + buf, err := EsReq("POST", requestUrl, []byte(requestBody)) + if err != nil { + return nil, err + } + source, err := FaceSourceAggregations(buf, thresholdTime, thresholdStayTime) + if err != nil { + return nil, err + } + faceSource := make([]map[string]interface{}, 0) + for index, info := range source { + if int(info["stayTime"].(float64)) > thresholdStayTime { + faceSource = append(faceSource, source[index]) + } + } + if len(faceSource) > total { + mapsSort := MapsSort{} + mapsSort.Key = "endTime" + mapsSort.MapList = faceSource + sort.Sort(&mapsSort) + return mapsSort.MapList[:total], nil + } + return faceSource, nil +} + +func GetFaceDataByTimeAndId(startTime string, endTime string, id string, thresholdTime int, thresholdStayTime int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) { + var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" + var requestBody = `{ + "query": { + "bool": { + "filter": [ + { + "range": { + "picDate": { + "gte": "` + startTime + `", + "lte": "` + endTime + `" + } + } + }, + { + "term":{ + "targetInfo.targetType.raw": "FaceDetect" + } + }, + { + "term":{ + "baseInfo.targetId": "` + id + `" + } + } + ] + } + }, + "size": 0, + "aggs": { + "buckets_aggs": { + "composite": { + "sources": [ + { + "faceId": { + "terms": { + "field": "baseInfo.targetId" + } + } + }, + { + "areaId": { + "terms": { + "field": "targetInfo.areaId" + } + } + } + ], + "size": 10000000 + }, + "aggs": { + "top_attention_hits": { + "top_hits": { + "size": 1000000, + "sort": [ + { + "picDate": { + "order": "asc" + } + } + ], + "_source": { + "includes": [ + "baseInfo.targetId", + "targetInfo.picSmUrl", + "targetInfo.areaId", + "picDate" + ] + } + } + } + } + } + } +}` + buf, err := EsReq("POST", requestUrl, []byte(requestBody)) + if err != nil { + return nil, err + } + source, err := FaceSourceAggregations(buf, thresholdTime, thresholdStayTime) + if err != nil { + return nil, err + } + faceSource := make([]map[string]interface{}, 0) + for index, info := range source { + if int(info["stayTime"].(float64)) > thresholdStayTime { + faceSource = append(faceSource, source[index]) + } + } + return faceSource, nil +} + +func GetFaceIdDeduplication(startTime string, endTime string, serverIp string, serverPort string, indexName string) (ids []string, err error) { + var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" + var requestBody = `{ + "query": { + "bool": { + "filter": [ + { + "range": { + "picDate": { + "gte": "` + startTime + `", + "lte": "` + endTime + `" + } + } + }, + { + "term": { + "targetInfo.targetType.raw": "FaceDetect" + } + } + ] + } + }, + "size": 0, + "aggs": { + "buckets_aggs": { + "composite": { + "sources": [ + { + "faceId": { + "terms": { + "field": "baseInfo.targetId" + } + } + } + ], + "size": 10000000 + } + } + } +}` + //fmt.Println(requestUrl) + //fmt.Println(requestBody) + buf, err := EsReq("POST", requestUrl, []byte(requestBody)) + if err != nil { + return nil, err + } + ids, err1 := SourceDeduplication(buf) + if err1 != nil { + return nil, err1 + } + return ids, nil +} + +//缁熻鍚勪釜鍖哄煙浜烘暟 +func StatisticsEveryAreaPersonsNumber(startTime string, endTime string, serverIp string, serverPort string, indexName string) ([]map[string]interface{}, error) { + var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" + var requestBody = `{ + "query": { + "bool": { + "filter": [ + { + "range": { + "picDate": { + "gte": "` + startTime + `", + "lte": "` + endTime + `" + } + } + }, + { + "term": { + "targetInfo.targetType.raw": "Yolo" + } + } + ] + } + }, + "size": 0, + "aggs": { + "buckets_aggs": { + "composite": { + "sources": [ + { + "areaId": { + "terms": { + "field": "targetInfo.areaId" + } + } + } + ], + "size": 10000000 + } + } + } +}` + buf, err := EsReq("POST", requestUrl, []byte(requestBody)) + if err != nil { + return nil, err + } + result, err := SourceStatistics(buf) + if err != nil { + return nil, err + } + return result, nil +} + +/**************************************customer analysis util end**************************************/ +//鏍规嵁鎽勫儚鏈哄垪琛ㄥ拰鏃堕棿鏌ヨ浜哄憳娴忚杞ㄨ抗 +func GetPersonDataByCameraIdAndTime(cameraId []string, startTime string, endTime string, serverIp string, serverPort string, indexName string) (map[string]interface{}, error) { + + var filterArr []string + if cameraId != nil && len(cameraId) > 0 { + esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1) + filterArr = append(filterArr, `{ + "terms": { + "cameraId": ["`+esCameraId+`"] + } + }`) + } + filterArr = append(filterArr, `{ + "range": { + "picDate": { + "gte": "`+startTime+`", + "lte": "`+endTime+`" + } + } + }`) + filterArr = append(filterArr, ` { + "term": { + "targetInfo.targetType.raw": "Yolo" + } + }`) + queryStr := strings.Join(filterArr, ",") + + personUrl := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" + personBody := `{ + "query": { + "bool": { + "filter": [ + ` + queryStr + ` + ] + } + }, + "size": 2147483647, + "_source": { + "includes": [ + "cameraId", + "cameraName", + "cameraAddr", + "targetInfo.targetScore", + "picDate", + "updateTime", + "picMaxUrl", + "targetInfo.belongsTargetId", + "targetInfo.targetLocation", + "picWH" + ] + } +}` + //fmt.Println(personUrl) + //fmt.Println(personBody) + source := make(map[string]interface{}) + queryStartTime := time.Now() + buf, err := EsReq("POST", personUrl, []byte(personBody)) + if err != nil { + return nil, err + } + queryUseTime := time.Now().Sub(queryStartTime).Seconds() * 1000 + sources, err := Sourcelist(buf) + if err != nil { + return nil, err + } + resData, err := PerSonAnalysis(sources) + source["result"] = resData + source["total"] = len(resData) + source["queryUseTime"] = queryUseTime + //println(sources) + return source, nil + +} + +//鏍规嵁鏃堕棿鑼冨洿锛屾憚鍍忔満鍒楄〃锛屽垎缁勮仛鍚堜汉鑴稿垪琛�,杩斿洖鍒嗙粍鏁版嵁 +func GetFaceDataBucketsByCameraIdAndTimeReturnByGrouped(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) { + var filterArr []string + if cameraId != nil && len(cameraId) > 0 { + esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1) + filterArr = append(filterArr, `{ + "terms": { + "cameraId": ["`+esCameraId+`"] + } + }`) + } + if personId != nil && len(personId) > 0 { + esPersonId := strings.Replace(strings.Trim(fmt.Sprint(personId), "[]"), " ", "\",\"", -1) + filterArr = append(filterArr, `{ + "terms": { + "baseInfo.targetId": ["`+esPersonId+`"] + } + }`) + } + filterArr = append(filterArr, `{ + "range": { + "picDate": { + "gte": "`+startTime+`", + "lte": "`+endTime+`" + } + } + }`) + filterArr = append(filterArr, ` { + "term": { + "targetInfo.targetType.raw": "FaceDetect" + } + }`) + queryStr := strings.Join(filterArr, ",") + + var buckersUrl = "http://" + serverIp + ":" + ServerPort + "/" + indexName + "/_search" + var buckersBody = `{ + "query": { + "bool": { + "filter": [ + ` + queryStr + ` + ] + } + }, + "size": 0, + "aggs": { + "buckets_aggs": { + "composite": { + "sources": [ + { + "baseInfo.targetId": { + "terms": { + "field": "baseInfo.targetId" + } + } + }, + { + "cameraId": { + "terms": { + "field": "cameraId" + } + } + } + ], + "size": 10000000 + }, + "aggs":{ + "top_attention_hits":{ + "top_hits":{ + "size": 1000000, + "sort": [ + { + "picDate": { + "order": "asc" + } + } + ], + "_source":{ + "includes":["baseInfo.targetId","cameraId","cameraName","cameraAddr","targetInfo.targetScore","targetInfo.picSmUrl","showLabels","baseInfo.tableId","baseInfo.tableName","baseInfo.bwType","baseInfo.targetName","baseInfo.compareScore","picDate","picMaxUrl","picWH"] + } + } + } + } + } + } +}` + //fmt.Println(buckersUrl) + //fmt.Println(buckersBody) + sources := make(map[string]interface{}) + queryStartTime := time.Now() + buf, err := EsReq("POST", buckersUrl, []byte(buckersBody)) + if err != nil { + return nil, err + } + queryUseTime := time.Now().Sub(queryStartTime).Seconds() * 1000 + //fmt.Println(queryUseTime) + tmpSources, err := SourceAggregationsReturnByGrouped(buf, thresholdTime) + if err != nil { + return nil, err + } + sources["result"] = tmpSources + sources["total"] = len(tmpSources) + sources["queryUseTime"] = queryUseTime + //println(sources) + return sources, nil +} + +//鏍规嵁鏃堕棿鑼冨洿锛屾憚鍍忔満鍒楄〃锛屽垎缁勮仛鍚堜汉鑴稿垪琛� +func GetFaceDataBucketsByCameraIdAndTime(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) { + var filterArr []string + if cameraId != nil && len(cameraId) > 0 { + esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1) + filterArr = append(filterArr, `{ + "terms": { + "cameraId": ["`+esCameraId+`"] + } + }`) + } + if personId != nil && len(personId) > 0 { + esPersonId := strings.Replace(strings.Trim(fmt.Sprint(personId), "[]"), " ", "\",\"", -1) + filterArr = append(filterArr, `{ + "terms": { + "baseInfo.targetId": ["`+esPersonId+`"] + } + }`) + } + filterArr = append(filterArr, `{ + "range": { + "picDate": { + "gte": "`+startTime+`", + "lte": "`+endTime+`" + } + } + }`) + filterArr = append(filterArr, ` { + "term": { + "targetInfo.targetType.raw": "FaceDetect" + } + }`) + queryStr := strings.Join(filterArr, ",") + + var buckersUrl = "http://" + serverIp + ":" + ServerPort + "/" + indexName + "/_search" + var buckersBody = `{ + "query": { + "bool": { + "filter": [ + ` + queryStr + ` + ] + } + }, + "size": 0, + "aggs": { + "buckets_aggs": { + "composite": { + "sources": [ + { + "baseInfo.targetId": { + "terms": { + "field": "baseInfo.targetId" + } + } + }, + { + "cameraId": { + "terms": { + "field": "cameraId" + } + } + } + ], + "size": 10000000 + }, + "aggs":{ + "top_attention_hits":{ + "top_hits":{ + "size": 1000000, + "sort": [ + { + "picDate": { + "order": "asc" + } + } + ], + "_source":{ + "includes":["baseInfo.targetId","cameraId","cameraName","cameraAddr","targetInfo.targetScore","targetInfo.picSmUrl","showLabels","baseInfo.tableId","baseInfo.tableName","baseInfo.bwType","baseInfo.targetName","baseInfo.compareScore","picDate","picMaxUrl","picWH"] + } + } + } + } + } + } +}` + //fmt.Println(buckersUrl) + //fmt.Println(buckersBody) + queryStartTime := time.Now() + buf, err := EsReq("POST", buckersUrl, []byte(buckersBody)) + if err != nil { + return nil, err + } + queryUseTime := time.Now().Sub(queryStartTime).Seconds() * 1000 + + sources, err := SourceAggregations(buf, thresholdTime, queryUseTime) + if err != nil { + return nil, err + } + return sources, nil +} + +//鏍规嵁鎶撴媿浜哄憳id鏇存柊锛坧icurl锛夊浘鐗囧湴鍧� +func UpdatePicUrlById(id string, picUrl string, indexName string, serverIp string, serverPort string) (err error) { + updateTime := time.Now().Format("2006-01-02 15:04:05") + tRes, err := AIOceaninfosbyid([]string{id}, indexName, serverIp, serverPort) + if err != nil || len(tRes) == 0 { + return err + } + picMaxUrls := tRes[0].PicMaxUrl + sourceStr := ` + "source": "ctx._source.picMaxUrl.add('` + picUrl + `');ctx._source.updateTime='` + updateTime + `'" +` + if len(picMaxUrls) >= 2 { + sourceStr = `"source": "ctx._source.picMaxUrl[1]='` + picUrl + `';ctx._source.updateTime='` + updateTime + `'"` + } + var info interface{} + url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query?refresh=true" + + var picUrlInfo = ` + { + "script": { + ` + sourceStr + ` + }, + "query": { + "bool": { + "filter": [ + { + "term": { + "id": "` + id + `" + } + } + ] + } + } + } + ` + //logPrint("url: ", url, videoUrlInfo) + //fmt.Println(url, picUrlInfo) + buf, err := EsReq("POST", url, []byte(picUrlInfo)) + if err != nil { + logPrint("http request videoUrlInfo info is err!") + return err + } + json.Unmarshal(buf, &info) + //logPrint(info) + out, ok := info.(map[string]interface{}) + if !ok { + logPrint("http response interface can not change map[string]interface{}") + return errors.New("http response interface can not change map[string]interface{}") + } + middle, ok := out["updated"].(float64) + if !ok { + logPrint("first updated change error!", out) + return errors.New("first updated change error!") + } + if middle == 1 { + return nil + } + if middle == 0 { + return errors.New("宸茬粡淇敼") + } + return nil +} + //鏍规嵁鎶撴媿浜哄憳id鏇存柊锛坴ideourl锛夋憚鍍忔満鍦板潃 func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int, err error) { @@ -284,7 +939,7 @@ "\"size\":\"1000\"," + "\"query\":{\"bool\":{" + queryStr + "\"filter\":[" + - "{\"term\":{\"targetInfo.targetType.raw\":\"face\"}}," + + "{\"term\":{\"targetInfo.targetType.raw\":\"FaceDetect\"}}," + cameraIdStr + alarmLevelStr + taskIdStr + @@ -770,7 +1425,7 @@ func GetOceanFeatures(serverIp string, serverPort string, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) { //queryIndexNum int //var dbinfos []*protomsg.MultiFeaCache - dbinfos := make([]*protomsg.MultiFeaCache,0) + dbinfos := make([]*protomsg.MultiFeaCache, 0) //dbinfosss := make([]*protomsg.MultiFeaCache,0) //dbinfoss = append(dbinfoss, dbinfosss...) @@ -819,20 +1474,20 @@ //logPrint("url: ",reqJsonDSL) buf, err := EsReq("POST", url, []byte(reqJsonDSL)) if err != nil { - logPrint("EsReq: ",err) + logPrint("EsReq: ", err) return } // 杩斿洖 _source 鏁扮粍 sources, err := Sourcelistforscroll(buf) if err != nil { - logPrint("EsReq: ",err) + logPrint("EsReq: ", err) return } // 杩斿洖鎵�鏈夋煡璇㈢殑鏁版嵁 - ftmpDatas := Parsesources(sources["sourcelist"].([]map[string]interface{})) + ftmpDatas := Parsesources(sources["sourcelist"].([]map[string]interface{})) lock.Lock() - dbinfos = append(dbinfos,ftmpDatas...) + dbinfos = append(dbinfos, ftmpDatas...) //logPrint("prsLen: ", len(Parsesources(sources["sourcelist"].([]map[string]interface{})))) //logPrint("dbinfosLen: ", len(dbinfos)) lock.Unlock() @@ -884,7 +1539,7 @@ } wg.Wait() - fmt.Println("lenth_all: ", len(dbinfos)) + //fmt.Println("lenth_all: ", len(dbinfos)) return dbinfos, nil } @@ -921,14 +1576,13 @@ if err != nil { return false, errors.New("瑙g爜澶辫触") } - if resTotal == -1 || resTotal == 0{ + if resTotal == -1 || resTotal == 0 { result = false } else { result = true } return result, nil } - //鎸夋棩鏈熻寖鍥达紝鏈嶅姟鍣↖d鍒犻櫎鏁版嵁 func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) { @@ -985,7 +1639,15 @@ } }, "query": { - "match_all": {} + "bool": { + "filter": [ + { + "term": { + "application": "loopCoverage" + } + } + ] + } } }` buf, err := EsReq("POST", url, []byte(addJson)) -- Gitblit v1.8.0