From 01d848906751a17acaa33d7adbdabe41469bc347 Mon Sep 17 00:00:00 2001
From: sunty <1172534965@qq.com>
Date: 星期四, 28 五月 2020 17:28:58 +0800
Subject: [PATCH] upgrade getOceanFeature

---
 EsApi.go |  120 ++++++++++++++++++++++++++++++++++++++++++++++++-----------
 1 files changed, 97 insertions(+), 23 deletions(-)

diff --git a/EsApi.go b/EsApi.go
index 8883fcc..f7218b7 100644
--- a/EsApi.go
+++ b/EsApi.go
@@ -767,10 +767,13 @@
 }
 
 // 鏌ヨ搴曞簱浜哄憳淇℃伅*缂撳瓨*
-func GetOceanFeatures(serverIp string, serverPort string, queryIndexNum int, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) {
-	var dbinfos []*protomsg.MultiFeaCache
-	point := strconv.Itoa(queryIndexNum)
-	number := strconv.Itoa(queryNums)
+func GetOceanFeatures(serverIp string, serverPort string, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) {
+	//queryIndexNum int
+	//var dbinfos []*protomsg.MultiFeaCache
+	dbinfos := make([]*protomsg.MultiFeaCache,0)
+	//dbinfosss := make([]*protomsg.MultiFeaCache,0)
+	//dbinfoss = append(dbinfoss, dbinfosss...)
+
 	JsonDSL := ""
 	var source []string
 	switch targetType {
@@ -779,9 +782,20 @@
 	case "track":
 		source = []string{"id", "targetInfo.feature", "analyServerId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.targetInfo.targetLocation"}
 	}
-	url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
-	JsonDSL = `  {
-                    "from": ` + point + `,        
+
+	url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local;scroll=1m"
+
+	var lock sync.RWMutex
+	var wg sync.WaitGroup
+
+	for i := 0; i < 48; i++ {
+		//璇锋眰浣�
+		JsonDSL = `  {
+                    "slice": {
+						"id": "` + strconv.Itoa(i) + `",
+						"max": 48 
+					},
+					"size":` + strconv.Itoa(queryNums) + `,
                     "query": {
 						"bool": {
 							"filter": [
@@ -793,26 +807,86 @@
 							]
 						}	
 					},
-                     "size":` + number + `,
                      "_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"]
                     }`
-	
-	logPrint("url: ",url)
-	logPrint("url: ",JsonDSL)
-	buf, err := EsReq("POST", url, []byte(JsonDSL))
-	if err != nil {
-		return dbinfos, err
-	}
+		wg.Add(1)
+		go func(reqJsonDSL string) {
+			defer wg.Done()
 
-	// 杩斿洖 _source 鏁扮粍
-	sources, err := Sourcelist(buf)
-	if err != nil {
-		return dbinfos, err
-	}
+			//fmt.Println(url)
+			//fmt.Println(prama)
+			//logPrint("url: ",url)
+			//logPrint("url: ",reqJsonDSL)
+			buf, err := EsReq("POST", url, []byte(reqJsonDSL))
+			if err != nil {
+				logPrint("EsReq: ",err)
+				return
+			}
 
-	// 杩斿洖鎵�鏈夋煡璇㈢殑鏁版嵁
-	dbpersoninfos := Parsesources(sources)
-	return dbpersoninfos, nil
+			// 杩斿洖 _source 鏁扮粍
+			sources, err := Sourcelistforscroll(buf)
+			if err != nil {
+				logPrint("EsReq: ",err)
+				return
+			}
+			// 杩斿洖鎵�鏈夋煡璇㈢殑鏁版嵁
+			ftmpDatas :=  Parsesources(sources["sourcelist"].([]map[string]interface{}))
+			lock.Lock()
+			dbinfos = append(dbinfos,ftmpDatas...)
+			//logPrint("prsLen: ", len(Parsesources(sources["sourcelist"].([]map[string]interface{}))))
+			//logPrint("dbinfosLen: ", len(dbinfos))
+			lock.Unlock()
+
+			scroll_id := sources["scroll_id"].(string)
+
+			//scroll璇锋眰澶�
+			scroll_url := "http://" + serverIp + ":" + serverPort + "/_search/scroll"
+			for {
+				next_scroll_id := ""
+				if next_scroll_id != "" {
+					scroll_id = next_scroll_id
+				}
+				jsonDSL := `{
+            "scroll": "1m", 
+            "scroll_id" : "` + scroll_id + `"
+        }`
+				//fmt.Println(scroll_url)
+				//fmt.Println(jsonDSL)
+				buf, err := EsReq("POST", scroll_url, []byte(jsonDSL))
+
+				if err != nil {
+					//fmt.Println("lenth1: ", len(dbinfos))
+					return
+				}
+				nextSources, err := Sourcelistforscroll(buf)
+
+				if nextSources == nil {
+					return
+				}
+
+				nextM := nextSources["sourcelist"].([]map[string]interface{})
+				//fmt.Println("id",nextSources)
+				if nextM == nil || len(nextM) == 0 {
+					//fmt.Println("lenth: ", len(capturetable))
+					return
+				}
+				tmpDatas := Parsesources(nextM)
+				lock.Lock()
+				dbinfos = append(dbinfos, tmpDatas...)
+				//logPrint("tmpDatasLen: ", len(tmpDatas))
+				//logPrint("AdbinfosLen: ", len(dbinfos))
+				lock.Unlock()
+
+				next_scroll_id = nextSources["scroll_id"].(string)
+			}
+
+		}(JsonDSL)
+	}
+	wg.Wait()
+
+	fmt.Println("lenth_all: ", len(dbinfos))
+
+	return dbinfos, nil
 }
 
 //************************CORN TASK*******************************

--
Gitblit v1.8.0