From 01d848906751a17acaa33d7adbdabe41469bc347 Mon Sep 17 00:00:00 2001
From: sunty <1172534965@qq.com>
Date: 星期四, 28 五月 2020 17:28:58 +0800
Subject: [PATCH] upgrade getOceanFeature
---
EsClient.go | 2
EsApi.go | 120 ++++++++++++++++++++++++++++++++-------
2 files changed, 98 insertions(+), 24 deletions(-)
diff --git a/EsApi.go b/EsApi.go
index 8883fcc..f7218b7 100644
--- a/EsApi.go
+++ b/EsApi.go
@@ -767,10 +767,13 @@
}
// 鏌ヨ搴曞簱浜哄憳淇℃伅*缂撳瓨*
-func GetOceanFeatures(serverIp string, serverPort string, queryIndexNum int, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) {
- var dbinfos []*protomsg.MultiFeaCache
- point := strconv.Itoa(queryIndexNum)
- number := strconv.Itoa(queryNums)
+func GetOceanFeatures(serverIp string, serverPort string, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) {
+ //queryIndexNum int
+ //var dbinfos []*protomsg.MultiFeaCache
+ dbinfos := make([]*protomsg.MultiFeaCache,0)
+ //dbinfosss := make([]*protomsg.MultiFeaCache,0)
+ //dbinfoss = append(dbinfoss, dbinfosss...)
+
JsonDSL := ""
var source []string
switch targetType {
@@ -779,9 +782,20 @@
case "track":
source = []string{"id", "targetInfo.feature", "analyServerId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.targetInfo.targetLocation"}
}
- url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
- JsonDSL = ` {
- "from": ` + point + `,
+
+ url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local;scroll=1m"
+
+ var lock sync.RWMutex
+ var wg sync.WaitGroup
+
+ for i := 0; i < 48; i++ {
+ //璇锋眰浣�
+ JsonDSL = ` {
+ "slice": {
+ "id": "` + strconv.Itoa(i) + `",
+ "max": 48
+ },
+ "size":` + strconv.Itoa(queryNums) + `,
"query": {
"bool": {
"filter": [
@@ -793,26 +807,86 @@
]
}
},
- "size":` + number + `,
"_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"]
}`
-
- logPrint("url: ",url)
- logPrint("url: ",JsonDSL)
- buf, err := EsReq("POST", url, []byte(JsonDSL))
- if err != nil {
- return dbinfos, err
- }
+ wg.Add(1)
+ go func(reqJsonDSL string) {
+ defer wg.Done()
- // 杩斿洖 _source 鏁扮粍
- sources, err := Sourcelist(buf)
- if err != nil {
- return dbinfos, err
- }
+ //fmt.Println(url)
+ //fmt.Println(prama)
+ //logPrint("url: ",url)
+ //logPrint("url: ",reqJsonDSL)
+ buf, err := EsReq("POST", url, []byte(reqJsonDSL))
+ if err != nil {
+ logPrint("EsReq: ",err)
+ return
+ }
- // 杩斿洖鎵�鏈夋煡璇㈢殑鏁版嵁
- dbpersoninfos := Parsesources(sources)
- return dbpersoninfos, nil
+ // 杩斿洖 _source 鏁扮粍
+ sources, err := Sourcelistforscroll(buf)
+ if err != nil {
+ logPrint("EsReq: ",err)
+ return
+ }
+ // 杩斿洖鎵�鏈夋煡璇㈢殑鏁版嵁
+ ftmpDatas := Parsesources(sources["sourcelist"].([]map[string]interface{}))
+ lock.Lock()
+ dbinfos = append(dbinfos,ftmpDatas...)
+ //logPrint("prsLen: ", len(Parsesources(sources["sourcelist"].([]map[string]interface{}))))
+ //logPrint("dbinfosLen: ", len(dbinfos))
+ lock.Unlock()
+
+ scroll_id := sources["scroll_id"].(string)
+
+ //scroll璇锋眰澶�
+ scroll_url := "http://" + serverIp + ":" + serverPort + "/_search/scroll"
+ for {
+ next_scroll_id := ""
+ if next_scroll_id != "" {
+ scroll_id = next_scroll_id
+ }
+ jsonDSL := `{
+ "scroll": "1m",
+ "scroll_id" : "` + scroll_id + `"
+ }`
+ //fmt.Println(scroll_url)
+ //fmt.Println(jsonDSL)
+ buf, err := EsReq("POST", scroll_url, []byte(jsonDSL))
+
+ if err != nil {
+ //fmt.Println("lenth1: ", len(dbinfos))
+ return
+ }
+ nextSources, err := Sourcelistforscroll(buf)
+
+ if nextSources == nil {
+ return
+ }
+
+ nextM := nextSources["sourcelist"].([]map[string]interface{})
+ //fmt.Println("id",nextSources)
+ if nextM == nil || len(nextM) == 0 {
+ //fmt.Println("lenth: ", len(capturetable))
+ return
+ }
+ tmpDatas := Parsesources(nextM)
+ lock.Lock()
+ dbinfos = append(dbinfos, tmpDatas...)
+ //logPrint("tmpDatasLen: ", len(tmpDatas))
+ //logPrint("AdbinfosLen: ", len(dbinfos))
+ lock.Unlock()
+
+ next_scroll_id = nextSources["scroll_id"].(string)
+ }
+
+ }(JsonDSL)
+ }
+ wg.Wait()
+
+ fmt.Println("lenth_all: ", len(dbinfos))
+
+ return dbinfos, nil
}
//************************CORN TASK*******************************
diff --git a/EsClient.go b/EsClient.go
index bbdb4bb..39754cf 100644
--- a/EsClient.go
+++ b/EsClient.go
@@ -611,7 +611,7 @@
func EsReq(method string, url string, parama []byte) (buf []byte, err error) {
//defer elapsed("page")()
- timeout := time.Duration(10 * time.Second)
+ timeout := time.Duration(100 * time.Second)
client := http.Client{
Timeout: timeout,
}
--
Gitblit v1.8.0