From 9d4aea534b369527a0470095ddaff6929c2cdf52 Mon Sep 17 00:00:00 2001 From: sunty <1172534965@qq.com> Date: 星期三, 20 十一月 2019 14:53:51 +0800 Subject: [PATCH] update to new feature caching --- EsApi.go | 467 +++++++++++++++------------------------------------------- 1 files changed, 123 insertions(+), 344 deletions(-) diff --git a/EsApi.go b/EsApi.go index ca1480d..7777b80 100644 --- a/EsApi.go +++ b/EsApi.go @@ -1,110 +1,19 @@ package esutil import ( - "encoding/json" - "errors" - "fmt" - "strconv" - "strings" + "encoding/json" + "errors" + "fmt" + "strconv" + "strings" "sync" "time" "basic.com/pubsub/protomsg.git" - ) - -// 鏌ヨ搴曞簱浜哄憳淇℃伅 -func Personinfos( queryIndex int, queryNums int, indexName string, serverIp string, serverPort string, analyServerId string) ([]*protomsg.Esinfo, error){ - var dbinfos []*protomsg.Esinfo - point := strconv.Itoa(queryIndex) - number := strconv.Itoa(queryNums) - JsonDSL := "" - if indexName == "videopersons" { - JsonDSL = ` { - "from": ` + point + `, - "query": { - "bool": { - "filter": [ - { - "term": { - "analyServerId": "` + analyServerId + `" - } - } - ] - } - }, - "size":`+ number +`, - "_source": [ - "id", - "faceFeature" - ] - }` - }else { - JsonDSL = ` { - "from": ` + point + `, - "query": { - "match_all": {} - }, - "size":`+ number +`, - "_source": [ - "id", - "tableId", - "faceFeature" - ] - }` - } - //fmt.Println("url: "+"http://"+serverIp+":"+serverPort+"/"+indexName+"/_search","body: ",JsonDSL) - buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(JsonDSL)) - if err != nil { - return dbinfos ,errors.New("http request dbtablename info is err!") - } - - // 杩斿洖 _source 鏁扮粍 - sources, err := Sourcelist(buf) - if err != nil { - return dbinfos,err - } - - // 杩斿洖鎵�鏈夋煡璇㈢殑鏁版嵁 - dbpersoninfos := Parsesources(sources) - return dbpersoninfos, nil -} - -// 鏍规嵁搴曞簱id鏌ヨ搴曞簱淇℃伅 -func Dbtablefosbyid (id []string, indexName string, serverIp string, serverPort string) ([]protomsg.Dbtable, error) { - var dbinfo []protomsg.Dbtable - dbtableId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1) - var dbinfoRequest = ` - { - "query": { - "bool": { - "filter": [{ - "terms": { - "id": [ - "`+ dbtableId +`" - ] - } - }] - } - }, - "size":1000000 - } - ` - buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfoRequest)) - if err != nil { - return dbinfo, err - } - - sources, err := Sourcelist(buf) - if err != nil { - return dbinfo , err - } - - dbtable := Dbtablebyid(sources) - return dbtable, nil -} +) // 鏍规嵁鎶撴媿浜哄憳id鏌ヨ鎶撴媿浜哄憳淇℃伅 -func AIOceaninfosbyid (id []string, indexName string, serverIp string, serverPort string) ([]protomsg.AIOcean, error) { +func AIOceaninfosbyid(id []string, indexName string, serverIp string, serverPort string) ([]protomsg.AIOcean, error) { var aIOceanInfo []protomsg.AIOcean videopersonsPersonId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1) var dbinfoRequest = ` @@ -114,7 +23,7 @@ "filter": [{ "terms": { "id": [ - "`+ videopersonsPersonId +`" + "` + videopersonsPersonId + `" ] } }] @@ -123,64 +32,30 @@ "size":1000000 } ` - buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfoRequest)) + buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search", []byte(dbinfoRequest)) if err != nil { - return aIOceanInfo, err + return aIOceanInfo, err } sources, err := Sourcelist(buf) if err != nil { - return aIOceanInfo , err + return aIOceanInfo, err } aIOcean := AIOceanAnalysis(sources) println(aIOcean) - return aIOcean,nil + return aIOcean, nil } -// 鏍规嵁搴曞簱浜哄憳id鏌ヨ搴曞簱浜哄憳淇℃伅 -func Dbpersoninfosbyid (id []string, indexName string, serverIp string, serverPort string) ([]protomsg.Dbperson, error) { - var dbinfo []protomsg.Dbperson - dbtablePersonId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1) - var dbinfoRequest = ` - { - "query": { - "bool": { - "filter": [{ - "terms": { - "id": [ - "`+ dbtablePersonId +`" - ] - } - }] - } - }, - "size":1000000 - } - ` - buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfoRequest)) - if err != nil { - return dbinfo, err - } - - sources, err := Sourcelist(buf) - if err != nil { - return dbinfo , err - } - - dbperson := Dbpersonbyid(sources) - println(dbperson) - return dbperson,nil -} //鏍规嵁鎶撴媿搴撲汉鍛榠d鏌ヨ鐗瑰緛鍊� -func GetVideoPersonFaceFeatureById (id string, indexName string, serverIp string, serverPort string) (string, error) { - var jsonDSL = ` +func GetVideoPersonFaceFeatureById(id string, indexName string, serverIp string, serverPort string) (string, error) { + var jsonDSL = ` { "query": { "bool": { "filter": [{ "term": { - "id":"`+ id +`" + "id":"` + id + `" } }] } @@ -188,70 +63,32 @@ "_source":["faceFeature"] } ` - buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(jsonDSL)) - if err != nil { - return "", err - } + buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search", []byte(jsonDSL)) + if err != nil { + return "", err + } - sources, err := Sourcelist(buf) - if err != nil { - return "" , err - } - faceFeature := sources[0]["faceFeature"].(string) - return faceFeature,nil + sources, err := Sourcelist(buf) + if err != nil { + return "", err + } + faceFeature := sources[0]["faceFeature"].(string) + return faceFeature, nil } - -// 鏍规嵁tableid 鏌ヨtablename -func Dbtablename(tableid string, indexName string, serverIp string, serverPort string) (tablename string, err error) { - var dbinfotable =` { - "query": { - "bool": { - "filter": [ - { - "term": { - "id":"`+tableid+`" - } - } - ] - } - }, - "_source": [ - "tableName" - ], - "size":1000000 - } - ` - buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfotable)) - if err != nil { - return "" ,errors.New("http request dbtablename info is err!") - } - sources, err := Sourcelist(buf) - if err != nil { - return "",err - } - for _, source := range sources { - if name, ok := source["tableName"].(string); ok { - tablename = name - break - } - } - return tablename, nil -} - //鏍规嵁鎶撴媿浜哄憳id鏇存柊锛坴ideourl锛夋憚鍍忔満鍦板潃 -func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string,command int) (statu int) { +func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int) { var info interface{} - url := "http://"+serverIp+":"+serverPort+"/"+indexName+"/_update_by_query?refresh=true" - sourceStr := "ctx._source.videoUrl='" + videoUrl + "'" - if command>=0 { - sourceStr = "ctx._source.linkTagInfo["+strconv.Itoa(command)+"].videoUrl='" + videoUrl + "'" + url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query?refresh=true" + sourceStr := "ctx._source.videoUrl='" + videoUrl + "'" + if command >= 0 { + sourceStr = "ctx._source.linkTagInfo[" + strconv.Itoa(command) + "].videoUrl='" + videoUrl + "'" } var videoUrlInfo = ` { "script": { - "source": "`+ sourceStr + `" + "source": "` + sourceStr + `" }, "query": { "term": { @@ -298,10 +135,10 @@ queryBody := compareArgs.InputValue //妫�绱㈡ if queryBody != "" { - queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"alarmRules.alarmLevel^1.5\",\"ageDescription^1.5\",\"taskName^1.5\",\"baseInfo.tableName^1.5\",\"sex^2.0\",\"race^2.0\",\"content^1.0\",\"baseInfo.idCard^1.8\",\"cameraAddr^1.0\"]," + + queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"cameraAddr^1.5\",\"taskName^1.5\",\"sdkName^1.5\",\"showLabels^3.0\",\"baseInfo.tableName^1.5\",\"baseInfo.targetName^1.5\",\"baseInfo.labels^1.5\",\"alarmRules.alarmLevel^1.5\",\"linkTag^1.5\"]," + "\"type\":\"cross_fields\",\"operator\":\"OR\",\"slop\":0,\"prefix_length\":0,\"max_expansions\":50,\"zero_terms_query\":\"NONE\",\"auto_generate_synonyms_phrase_query\":true,\"fuzzy_transpositions\":true,\"boost\":1}}]," } - if compareArgs.SearchTime == nil || len(compareArgs.SearchTime)!=2 { + if compareArgs.SearchTime == nil || len(compareArgs.SearchTime) != 2 { return nil } gteDate := compareArgs.SearchTime[0] @@ -337,7 +174,7 @@ //鍒ゆ柇甯冮槻绛夌骇 alarmLevelStr := "" - if alarmLevelTypes !="" { + if alarmLevelTypes != "" { alarmLevelStr = "{\"terms\":{\"alarmRules.alarmLevel\":[\"" + alarmLevelTypes + "\"]}}," } @@ -364,6 +201,7 @@ "\"size\":\"1000\"," + "\"query\":{\"bool\":{" + queryStr + "\"filter\":[" + + "{\"term\":{\"targetInfo.targetType.raw\":\"face\"}}," + cameraIdStr + alarmLevelStr + taskIdStr + @@ -455,57 +293,8 @@ return capturetable } -//鑾峰彇搴曞簱浜哄憳ID -func GetDbpersonsId(compareArgs protomsg.CompareArgs,indexName string, serverIp string, serverPort string) (source map[string][]string) { - queryStr := "" - queryBody := compareArgs.InputValue - //妫�绱㈡ - if queryBody != "" { - queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"personName^1.5\",\"age^1.5\",\"idCard^1.5\",\"phoneNum^1.5\",\"sex^2.0\",\"reserved^2.0\"]," + - "\"type\":\"cross_fields\",\"operator\":\"OR\",\"slop\":0,\"prefix_length\":0,\"max_expansions\":50,\"zero_terms_query\":\"NONE\",\"auto_generate_synonyms_phrase_query\":true,\"fuzzy_transpositions\":true,\"boost\":1}}]," - } - //鍒ゆ柇搴撹〃ID - tableId := compareArgs.Tabs - esTableId := "" - esTableIdStr := "" - if tableId != nil && len(tableId) > 0 { - esTableId = strings.Replace(strings.Trim(fmt.Sprint(tableId), "[]"), " ", "\",\"", -1) - esTableIdStr = "{\"terms\":{\"tableId\":[\"" + esTableId + "\"]}}" - } - - prama := "{" + - "\"size\":\"100000000\"," + - "\"query\":{\"bool\":{" + queryStr + - "\"filter\":[" + - esTableIdStr + - "]}}," + - "\"_source\":[\"id\",\"tableId\"]" + - "}" - - url := "http://" + serverIp + ":" + serverPort + - "/" + indexName + "/_search?search_type=dfs_query_then_fetch" - fmt.Println(url) - fmt.Println(prama) - buf, err := EsReq("POST", url,[]byte(prama)) - if err != nil { - fmt.Println("http request videoUrlInfo info is err!") - return - } - sources, err := Sourcelist(buf) - if err != nil { - return - } - tabsource := make(map[string][]string) - for _, source := range sources{ - tableId := source["tableId"].(string) - id := source["id"].(string) - tabsource[tableId] = append(tabsource[tableId], id) - } - return tabsource -} - //鍒濆鍖栧疄鏃舵姄鎷� -func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool ,quantity int) ([]protomsg.AIOcean, error){ +func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool, quantity int) ([]protomsg.AIOcean, error) { var aIOceanInfo []protomsg.AIOcean url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" @@ -528,8 +317,8 @@ },` } DSLJson := `{ - "size":`+strconv.Itoa(quantity)+`, - `+queryStr+` + "size":` + strconv.Itoa(quantity) + `, + ` + queryStr + ` "sort":[{"picDate":{"order":"desc"}}], "_source": {"includes":[],"excludes":["*.feature"]} }` @@ -549,7 +338,7 @@ } //瀹炴椂鎶撴媿 -func RealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool ) ([]protomsg.AIOcean, error){ +func RealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool) ([]protomsg.AIOcean, error) { var aIOceanInfo []protomsg.AIOcean url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" @@ -578,7 +367,7 @@ } } }, - `+queryStr+` + ` + queryStr + ` ] } }, @@ -601,7 +390,7 @@ } //缁煎悎缁熻 -func StatisticsComprehensive(serverIp string, serverPort string, indexName string, isAlarm bool) (total int, err error){ +func StatisticsComprehensive(serverIp string, serverPort string, indexName string, isAlarm bool) (total int, err error) { isAlarmStr := "" if isAlarm == true { isAlarmStr = `,{"term":{"isAlarm":true}}` @@ -619,13 +408,13 @@ } } } - `+isAlarmStr+` + ` + isAlarmStr + ` ] } } }` //fmt.Println(DSLJson) - buf, err := EsReq("POST",url,[]byte(DSLJson)) + buf, err := EsReq("POST", url, []byte(DSLJson)) if err != nil { return total, err } @@ -641,13 +430,13 @@ } total = int(middle["total"].(float64)) //fmt.Println(total) - return total,nil + return total, nil } //瀹炴椂鎶ヨ浠诲姟姣旂巼 -func RealTimeAlarmTaskRate(serverIp string, serverPort string, indexName string) (sources []map[string]interface{},err error){ +func RealTimeAlarmTaskRate(serverIp string, serverPort string, indexName string) (sources []map[string]interface{}, err error) { url := "http://" + serverIp + ":" + serverPort + - "/"+indexName+"/_search" + "/" + indexName + "/_search" DSLJson := `{ "size":0, "query":{ @@ -669,7 +458,7 @@ } } }` - buf, err := EsReq("POST",url,[]byte(DSLJson)) + buf, err := EsReq("POST", url, []byte(DSLJson)) if err != nil { return nil, err } @@ -688,8 +477,8 @@ return nil, errors.New("first hits change error!") } - for _, in := range sdkName_status["buckets"].([]interface{}){ - var source = make(map[string]interface{},0) + for _, in := range sdkName_status["buckets"].([]interface{}) { + var source = make(map[string]interface{}, 0) tmpbuf, ok := in.(map[string]interface{}) if !ok { fmt.Println("change to source error!") @@ -702,14 +491,13 @@ sources = append(sources, source) } //fmt.Println("tmpSource",sources) - return sources,nil + return sources, nil } - //鑱氬悎浠诲姟鍒楄〃锛宼askId+taskName -func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string) (sources []map[string]interface{},err error){ +func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string) (sources []map[string]interface{}, err error) { url := "http://" + serverIp + ":" + serverPort + - "/"+indexName+"/_search" + "/" + indexName + "/_search" serverFilterStr := "" if analyServerId != "" { serverFilterStr = `, @@ -718,7 +506,7 @@ "filter": [ { "term": { - "analyServerId": "`+analyServerId+`" + "analyServerId": "` + analyServerId + `" } } ] @@ -750,9 +538,9 @@ } } } - `+serverFilterStr+` + ` + serverFilterStr + ` }` - buf, err := EsReq("POST",url,[]byte(DSLJson)) + buf, err := EsReq("POST", url, []byte(DSLJson)) if err != nil { return nil, err } @@ -771,8 +559,8 @@ return nil, errors.New("first hits change error!") } - for _, in := range task_status["buckets"].([]interface{}){ - var source = make(map[string]interface{},0) + for _, in := range task_status["buckets"].([]interface{}) { + var source = make(map[string]interface{}, 0) tmpbuf, ok := in.(map[string]interface{}) if !ok { fmt.Println("change to source error!") @@ -788,20 +576,27 @@ sources = append(sources, source) } //fmt.Println("tmpSource",sources) - return sources,nil + return sources, nil } - /****************************************浠ヤ笅涓簊dkCompare姣斿缂撳瓨浣跨敤鏂规硶*********************************************/ //鑾峰彇鏌ヨ鎬绘暟 -func GetTotal(serverIp string, serverPort string,indexName string,shards string) (total int) { - JsonDSL := `{"query":{"bool":{"must":[{"match_all":{}}]}},"size":0}` - - url := "http://" + serverIp + ":" + serverPort+ "/" + indexName + "/_search" - if indexName == "videopersons" { - url = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local" +func GetTotal(serverIp string, serverPort string, indexName string, shards string) (total int) { + JsonDSL := `{ + "size": 0, + "query": { + "bool": { + "filter": [{ + "term": { + "targetInfo.targetType.raw": "face" + } + }] + } } +}` + + url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" buf, err := EsReq("POST", url, []byte(JsonDSL)) if err != nil { return @@ -822,101 +617,85 @@ } //鏌ヨ鏃堕棿娈垫暟鎹� -func GetPeriodInfos(serverIp string, serverPort string,startTime string, endTime string, indexName string,shards string) ([]*protomsg.Esinfo, error) { - var dbinfos []*protomsg.Esinfo - url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" - if indexName == "videopersons" { - url = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local" - } +func GetPeriodInfos(serverIp string, serverPort string, startTime string, endTime string, indexName string, shards string) ([]*protomsg.Esinfo, error) { + var capdbinfo []*protomsg.Esinfo + url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local" + JsonDSL := ` - { - "query": { - "bool": { - "filter": [{ - "range": { - "picDate": { - "gte": "` + startTime + `", - "lt": "` + endTime + `" - } - } - }] - } - }, - "size": 1000000, - "_source": [ - "id", - "tableId", - "faceFeature", - "analyServerId" - ] - - } + { + "query": { + "bool": { + "filter": [ + { + "term": { + "targetInfo.targetType.raw": "face" + } + }, + { + "range": { + "picDate": { + "gte": "` + startTime + `", + "lt": "` + endTime + `" + } + } + } + ] + } + }, + "size": 1000000, + "_source": [ + "id", + "targetInfo.feature", + "analyServerId" + ] + } ` //logger.Debug(url) //logger.Debug(JsonDSL) buf, err := EsReq("POST", url, []byte(JsonDSL)) if err != nil { - return dbinfos, errors.New("http request dbtablename info is err!") + return capdbinfo, errors.New("http request dbtablename info is err!") } // 杩斿洖 _source 鏁扮粍 sources, err := Sourcelist(buf) if err != nil { - return dbinfos, err + return capdbinfo, err } // 杩斿洖鎵�鏈夋煡璇㈢殑鏁版嵁 - dbpersoninfos := Parsesources(sources) - return dbpersoninfos, nil + capdbinfos := Parsesources(sources) + return capdbinfos, nil } // 鏌ヨ搴曞簱浜哄憳淇℃伅 -func GetOceanFeatures(serverIp string, serverPort string,queryIndexNum int, queryNums int, indexName string, shards string) ([]*protomsg.Esinfo, error) { +func GetOceanFeatures(serverIp string, serverPort string, queryIndexNum int, queryNums int, indexName string, shards string) ([]*protomsg.Esinfo, error) { var dbinfos []*protomsg.Esinfo point := strconv.Itoa(queryIndexNum) number := strconv.Itoa(queryNums) JsonDSL := "" - url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" - if indexName == "videopersons" { - url = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local" - JsonDSL = ` { + url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local" + JsonDSL = ` { "from": ` + point + `, "query": { - "match_all": {} - }, + "bool": { + "filter": [ + { + "term": { + "targetInfo.targetType.raw": "face" + } + } + ] + } + }, "size":` + number + `, "_source": [ - "id", - "tableId", - "faceFeature", - "analyServerId" + "id", + "targetInfo.feature", + "analyServerId" ] }` - } else { //鏌ュ簳搴撴湁鏁堜汉鍛� - JsonDSL = ` { - "from": ` + point + `, - "query": { - "bool": { - "filter":[{ - "term":{ - "enable":1 - } - },{ - "term":{ - "isDelete":0 - } - }] - } - }, - "size":` + number + `, - "_source": [ - "id", - "tableId", - "faceFeature" - ] - }` - } buf, err := EsReq("POST", url, []byte(JsonDSL)) if err != nil { -- Gitblit v1.8.0