From a88cd5f3c40d32fb48e80873667455f1424ae1cf Mon Sep 17 00:00:00 2001 From: sunty <1172534965@qq.com> Date: 星期日, 23 八月 2020 16:28:40 +0800 Subject: [PATCH] add w h --- EsApi.go | 361 ++++++++++++++++++++++++++++++++++++++++++++++---- 1 files changed, 328 insertions(+), 33 deletions(-) diff --git a/EsApi.go b/EsApi.go index 8883fcc..b812bf5 100644 --- a/EsApi.go +++ b/EsApi.go @@ -159,6 +159,220 @@ } +//鏍规嵁鎽勫儚鏈哄垪琛ㄥ拰鏃堕棿鏌ヨ浜哄憳娴忚杞ㄨ抗 +func GetPersonDataByCameraIdAndTime(cameraId []string, startTime string, endTime string, serverIp string, ServerPort string, indexName string) ([]map[string]interface{}, error) { + esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1) + personUrl := "http://" + serverIp + ":" + ServerPort + "/" + indexName + "/_search" + personBody := `{ + "query": { + "bool": { + "filter": [ + { + "range": { + "picDate": { + "gte": "` + startTime + `", + "lte": "` + endTime + `" + } + } + }, + { + "term": { + "targetInfo.targetType.raw": "Yolo" + } + }, + { + "terms": { + "cameraId": [ + "` + esCameraId + `" + ] + } + } + ] + } + }, + "size": 2147483647, + "_source": { + "includes": [ + "cameraId", + "cameraName", + "cameraAddr", + "targetInfo.targetScore", + "picDate", + "updateTime", + "picMaxUrl", + "targetInfo.belongsTargetId", + "targetInfo.targetLocation", + "picWH" + ] + } +}` + //fmt.Println(personUrl) + //fmt.Println(personBody) + buf, err := EsReq("POST", personUrl, []byte(personBody)) + if err != nil { + return nil, err + } + + sources, err := Sourcelist(buf) + if err != nil { + return nil, err + } + resData,err := PerSonAnalysis(sources) + //println(sources) + return resData, nil + +} + +//鏍规嵁鏃堕棿鑼冨洿锛屾憚鍍忔満鍒楄〃锛屽垎缁勮仛鍚堜汉鑴稿垪琛� +func GetFaceDataBucketsByCameraIdAndTime(cameraId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) { + esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1) + var buckersUrl = "http://" + serverIp + ":" + ServerPort + "/" + indexName + "/_search" + var buckersBody = `{ + "query": { + "bool": { + "filter": [ + { + "range": { + "picDate": { + "gte": "` + startTime + `", + "lte": "` + endTime + `" + } + } + }, + { + "term": { + "targetInfo.targetType.raw": "FaceDetect" + } + }, + { + "terms": { + "cameraId": ["` + esCameraId + `"] + } + } + ] + } + }, + "size": 0, + "aggs": { + "buckets_aggs": { + "composite": { + "sources": [ + { + "baseInfo.targetId": { + "terms": { + "field": "baseInfo.targetId" + } + } + }, + { + "cameraId": { + "terms": { + "field": "cameraId" + } + } + } + ], + "size": 100000000 + }, + "aggs":{ + "top_attention_hits":{ + "top_hits":{ + "size": 1000000, + "sort": [ + { + "picDate": { + "order": "asc" + } + } + ], + "_source":{ + "includes":["baseInfo.targetId","cameraId","cameraName","cameraAddr","targetInfo.targetScore","targetInfo.picSmUrl","showLabels","baseInfo.tableId","baseInfo.tableName","baseInfo.bwType","baseInfo.targetName","baseInfo.compareScore","picDate","picMaxUrl","picWH"] + } + } + } + } + } + } +}` + //fmt.Println(buckersUrl) + //fmt.Println(buckersBody) + buf, err := EsReq("POST", buckersUrl, []byte(buckersBody)) + if err != nil { + return nil, err + } + + sources, err := SourceAggregations(buf, thresholdTime) + if err != nil { + return nil, err + } + //println(sources) + return sources, nil +} + +//鏍规嵁鎶撴媿浜哄憳id鏇存柊锛坧icurl锛夊浘鐗囧湴鍧� +func UpdatePicUrlById(id string, picUrl string, indexName string, serverIp string, serverPort string) (err error) { + updateTime := time.Now().Format("2006-01-02 15:04:05") + tRes, err := AIOceaninfosbyid([]string{id}, indexName, serverIp, serverPort) + if err != nil || len(tRes) == 0 { + return err + } + picMaxUrls := tRes[0].PicMaxUrl + sourceStr := ` + "lang":"painless", + "inline": "ctx._source.picMaxUrl.add('` + picUrl + `');ctx._source.updateTime='`+updateTime+`'" +` + if len(picMaxUrls) >= 2 { + sourceStr = `"source": "ctx._source.picMaxUrl[1]='` + picUrl + `';ctx._source.updateTime='`+updateTime+`'"` + } + var info interface{} + url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query?refresh=true" + + var picUrlInfo = ` + { + "script": { + ` + sourceStr + ` + }, + "query": { + "bool": { + "filter": [ + { + "term": { + "id": "` + id + `" + } + } + ] + } + } + } + ` + //logPrint("url: ", url, videoUrlInfo) + fmt.Println(url, picUrlInfo) + buf, err := EsReq("POST", url, []byte(picUrlInfo)) + if err != nil { + logPrint("http request videoUrlInfo info is err!") + return err + } + json.Unmarshal(buf, &info) + //logPrint(info) + out, ok := info.(map[string]interface{}) + if !ok { + logPrint("http response interface can not change map[string]interface{}") + return errors.New("http response interface can not change map[string]interface{}") + } + middle, ok := out["updated"].(float64) + if !ok { + logPrint("first updated change error!") + return errors.New("first updated change error!") + } + if middle == 1 { + return nil + } + if middle == 0 { + return errors.New("宸茬粡淇敼") + } + return nil +} + //鏍规嵁鎶撴媿浜哄憳id鏇存柊锛坴ideourl锛夋憚鍍忔満鍦板潃 func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int, err error) { @@ -284,7 +498,7 @@ "\"size\":\"1000\"," + "\"query\":{\"bool\":{" + queryStr + "\"filter\":[" + - "{\"term\":{\"targetInfo.targetType.raw\":\"face\"}}," + + "{\"term\":{\"targetInfo.targetType.raw\":\"FaceDetect\"}}," + cameraIdStr + alarmLevelStr + taskIdStr + @@ -298,8 +512,8 @@ go func(reqParam string) { defer wg.Done() - //logPrint(url) - //logPrint(prama) + logPrint(url) + logPrint(prama) buf, err := EsReq("POST", url, []byte(reqParam)) if err != nil { @@ -335,8 +549,8 @@ "scroll": "1m", "scroll_id" : "` + scroll_id + `" }` - //logPrint(scroll_url) - //logPrint(jsonDSL) + logPrint(scroll_url) + logPrint(jsonDSL) buf, err := EsReq("POST", scroll_url, []byte(jsonDSL)) if err != nil { @@ -717,7 +931,7 @@ url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local" var source []string switch targetType { - case "face": + case "face", "FaceDetect": source = []string{"id", "targetInfo.feature", "analyServerId", "cameraId"} case "track": source = []string{"id", "targetInfo.feature", "analyServerId", "cameraId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.cameraId", "linkTagInfo.targetInfo.targetLocation"} @@ -767,21 +981,35 @@ } // 鏌ヨ搴曞簱浜哄憳淇℃伅*缂撳瓨* -func GetOceanFeatures(serverIp string, serverPort string, queryIndexNum int, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) { - var dbinfos []*protomsg.MultiFeaCache - point := strconv.Itoa(queryIndexNum) - number := strconv.Itoa(queryNums) +func GetOceanFeatures(serverIp string, serverPort string, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) { + //queryIndexNum int + //var dbinfos []*protomsg.MultiFeaCache + dbinfos := make([]*protomsg.MultiFeaCache, 0) + //dbinfosss := make([]*protomsg.MultiFeaCache,0) + //dbinfoss = append(dbinfoss, dbinfosss...) + JsonDSL := "" var source []string switch targetType { - case "face": + case "face", "FaceDetect": source = []string{"id", "targetInfo.feature", "analyServerId"} case "track": source = []string{"id", "targetInfo.feature", "analyServerId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.targetInfo.targetLocation"} } - url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local" - JsonDSL = ` { - "from": ` + point + `, + + url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local;scroll=1m" + + var lock sync.RWMutex + var wg sync.WaitGroup + + for i := 0; i < 48; i++ { + //璇锋眰浣� + JsonDSL = ` { + "slice": { + "id": "` + strconv.Itoa(i) + `", + "max": 48 + }, + "size":` + strconv.Itoa(queryNums) + `, "query": { "bool": { "filter": [ @@ -793,26 +1021,86 @@ ] } }, - "size":` + number + `, "_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"] }` - - logPrint("url: ",url) - logPrint("url: ",JsonDSL) - buf, err := EsReq("POST", url, []byte(JsonDSL)) - if err != nil { - return dbinfos, err - } + wg.Add(1) + go func(reqJsonDSL string) { + defer wg.Done() - // 杩斿洖 _source 鏁扮粍 - sources, err := Sourcelist(buf) - if err != nil { - return dbinfos, err - } + //fmt.Println(url) + //fmt.Println(prama) + //logPrint("url: ",url) + //logPrint("url: ",reqJsonDSL) + buf, err := EsReq("POST", url, []byte(reqJsonDSL)) + if err != nil { + logPrint("EsReq: ", err) + return + } - // 杩斿洖鎵�鏈夋煡璇㈢殑鏁版嵁 - dbpersoninfos := Parsesources(sources) - return dbpersoninfos, nil + // 杩斿洖 _source 鏁扮粍 + sources, err := Sourcelistforscroll(buf) + if err != nil { + logPrint("EsReq: ", err) + return + } + // 杩斿洖鎵�鏈夋煡璇㈢殑鏁版嵁 + ftmpDatas := Parsesources(sources["sourcelist"].([]map[string]interface{})) + lock.Lock() + dbinfos = append(dbinfos, ftmpDatas...) + //logPrint("prsLen: ", len(Parsesources(sources["sourcelist"].([]map[string]interface{})))) + //logPrint("dbinfosLen: ", len(dbinfos)) + lock.Unlock() + + scroll_id := sources["scroll_id"].(string) + + //scroll璇锋眰澶� + scroll_url := "http://" + serverIp + ":" + serverPort + "/_search/scroll" + for { + next_scroll_id := "" + if next_scroll_id != "" { + scroll_id = next_scroll_id + } + jsonDSL := `{ + "scroll": "1m", + "scroll_id" : "` + scroll_id + `" + }` + //fmt.Println(scroll_url) + //fmt.Println(jsonDSL) + buf, err := EsReq("POST", scroll_url, []byte(jsonDSL)) + + if err != nil { + //fmt.Println("lenth1: ", len(dbinfos)) + return + } + nextSources, err := Sourcelistforscroll(buf) + + if nextSources == nil { + return + } + + nextM := nextSources["sourcelist"].([]map[string]interface{}) + //fmt.Println("id",nextSources) + if nextM == nil || len(nextM) == 0 { + //fmt.Println("lenth: ", len(capturetable)) + return + } + tmpDatas := Parsesources(nextM) + lock.Lock() + dbinfos = append(dbinfos, tmpDatas...) + //logPrint("tmpDatasLen: ", len(tmpDatas)) + //logPrint("AdbinfosLen: ", len(dbinfos)) + lock.Unlock() + + next_scroll_id = nextSources["scroll_id"].(string) + } + + }(JsonDSL) + } + wg.Wait() + + fmt.Println("lenth_all: ", len(dbinfos)) + + return dbinfos, nil } //************************CORN TASK******************************* @@ -847,14 +1135,13 @@ if err != nil { return false, errors.New("瑙g爜澶辫触") } - if resTotal == -1 || resTotal == 0{ + if resTotal == -1 || resTotal == 0 { result = false } else { result = true } return result, nil } - //鎸夋棩鏈熻寖鍥达紝鏈嶅姟鍣↖d鍒犻櫎鏁版嵁 func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) { @@ -911,7 +1198,15 @@ } }, "query": { - "match_all": {} + "bool": { + "filter": [ + { + "term": { + "application": "loopCoverage" + } + } + ] + } } }` buf, err := EsReq("POST", url, []byte(addJson)) -- Gitblit v1.8.0