From 01d848906751a17acaa33d7adbdabe41469bc347 Mon Sep 17 00:00:00 2001 From: sunty <1172534965@qq.com> Date: 星期四, 28 五月 2020 17:28:58 +0800 Subject: [PATCH] upgrade getOceanFeature --- EsApi.go | 120 ++++++++++++++++++++++++++++++++++++++++++++++++----------- 1 files changed, 97 insertions(+), 23 deletions(-) diff --git a/EsApi.go b/EsApi.go index 8883fcc..f7218b7 100644 --- a/EsApi.go +++ b/EsApi.go @@ -767,10 +767,13 @@ } // 鏌ヨ搴曞簱浜哄憳淇℃伅*缂撳瓨* -func GetOceanFeatures(serverIp string, serverPort string, queryIndexNum int, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) { - var dbinfos []*protomsg.MultiFeaCache - point := strconv.Itoa(queryIndexNum) - number := strconv.Itoa(queryNums) +func GetOceanFeatures(serverIp string, serverPort string, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) { + //queryIndexNum int + //var dbinfos []*protomsg.MultiFeaCache + dbinfos := make([]*protomsg.MultiFeaCache,0) + //dbinfosss := make([]*protomsg.MultiFeaCache,0) + //dbinfoss = append(dbinfoss, dbinfosss...) + JsonDSL := "" var source []string switch targetType { @@ -779,9 +782,20 @@ case "track": source = []string{"id", "targetInfo.feature", "analyServerId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.targetInfo.targetLocation"} } - url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local" - JsonDSL = ` { - "from": ` + point + `, + + url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local;scroll=1m" + + var lock sync.RWMutex + var wg sync.WaitGroup + + for i := 0; i < 48; i++ { + //璇锋眰浣� + JsonDSL = ` { + "slice": { + "id": "` + strconv.Itoa(i) + `", + "max": 48 + }, + "size":` + strconv.Itoa(queryNums) + `, "query": { "bool": { "filter": [ @@ -793,26 +807,86 @@ ] } }, - "size":` + number + `, "_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"] }` - - logPrint("url: ",url) - logPrint("url: ",JsonDSL) - buf, err := EsReq("POST", url, []byte(JsonDSL)) - if err != nil { - return dbinfos, err - } + wg.Add(1) + go func(reqJsonDSL string) { + defer wg.Done() - // 杩斿洖 _source 鏁扮粍 - sources, err := Sourcelist(buf) - if err != nil { - return dbinfos, err - } + //fmt.Println(url) + //fmt.Println(prama) + //logPrint("url: ",url) + //logPrint("url: ",reqJsonDSL) + buf, err := EsReq("POST", url, []byte(reqJsonDSL)) + if err != nil { + logPrint("EsReq: ",err) + return + } - // 杩斿洖鎵�鏈夋煡璇㈢殑鏁版嵁 - dbpersoninfos := Parsesources(sources) - return dbpersoninfos, nil + // 杩斿洖 _source 鏁扮粍 + sources, err := Sourcelistforscroll(buf) + if err != nil { + logPrint("EsReq: ",err) + return + } + // 杩斿洖鎵�鏈夋煡璇㈢殑鏁版嵁 + ftmpDatas := Parsesources(sources["sourcelist"].([]map[string]interface{})) + lock.Lock() + dbinfos = append(dbinfos,ftmpDatas...) + //logPrint("prsLen: ", len(Parsesources(sources["sourcelist"].([]map[string]interface{})))) + //logPrint("dbinfosLen: ", len(dbinfos)) + lock.Unlock() + + scroll_id := sources["scroll_id"].(string) + + //scroll璇锋眰澶� + scroll_url := "http://" + serverIp + ":" + serverPort + "/_search/scroll" + for { + next_scroll_id := "" + if next_scroll_id != "" { + scroll_id = next_scroll_id + } + jsonDSL := `{ + "scroll": "1m", + "scroll_id" : "` + scroll_id + `" + }` + //fmt.Println(scroll_url) + //fmt.Println(jsonDSL) + buf, err := EsReq("POST", scroll_url, []byte(jsonDSL)) + + if err != nil { + //fmt.Println("lenth1: ", len(dbinfos)) + return + } + nextSources, err := Sourcelistforscroll(buf) + + if nextSources == nil { + return + } + + nextM := nextSources["sourcelist"].([]map[string]interface{}) + //fmt.Println("id",nextSources) + if nextM == nil || len(nextM) == 0 { + //fmt.Println("lenth: ", len(capturetable)) + return + } + tmpDatas := Parsesources(nextM) + lock.Lock() + dbinfos = append(dbinfos, tmpDatas...) + //logPrint("tmpDatasLen: ", len(tmpDatas)) + //logPrint("AdbinfosLen: ", len(dbinfos)) + lock.Unlock() + + next_scroll_id = nextSources["scroll_id"].(string) + } + + }(JsonDSL) + } + wg.Wait() + + fmt.Println("lenth_all: ", len(dbinfos)) + + return dbinfos, nil } //************************CORN TASK******************************* -- Gitblit v1.8.0