sunty
2020-05-28 01d848906751a17acaa33d7adbdabe41469bc347
EsApi.go
@@ -767,10 +767,13 @@
}
// 查询底库人员信息*缓存*
func GetOceanFeatures(serverIp string, serverPort string, queryIndexNum int, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) {
   var dbinfos []*protomsg.MultiFeaCache
   point := strconv.Itoa(queryIndexNum)
   number := strconv.Itoa(queryNums)
func GetOceanFeatures(serverIp string, serverPort string, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) {
   //queryIndexNum int
   //var dbinfos []*protomsg.MultiFeaCache
   dbinfos := make([]*protomsg.MultiFeaCache,0)
   //dbinfosss := make([]*protomsg.MultiFeaCache,0)
   //dbinfoss = append(dbinfoss, dbinfosss...)
   JsonDSL := ""
   var source []string
   switch targetType {
@@ -779,9 +782,20 @@
   case "track":
      source = []string{"id", "targetInfo.feature", "analyServerId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.targetInfo.targetLocation"}
   }
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
   JsonDSL = `  {
                    "from": ` + point + `,
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local;scroll=1m"
   var lock sync.RWMutex
   var wg sync.WaitGroup
   for i := 0; i < 48; i++ {
      //请求体
      JsonDSL = `  {
                    "slice": {
                  "id": "` + strconv.Itoa(i) + `",
                  "max": 48
               },
               "size":` + strconv.Itoa(queryNums) + `,
                    "query": {
                  "bool": {
                     "filter": [
@@ -793,26 +807,86 @@
                     ]
                  }   
               },
                     "size":` + number + `,
                     "_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"]
                    }`
   logPrint("url: ",url)
   logPrint("url: ",JsonDSL)
   buf, err := EsReq("POST", url, []byte(JsonDSL))
   if err != nil {
      return dbinfos, err
   }
      wg.Add(1)
      go func(reqJsonDSL string) {
         defer wg.Done()
   // 返回 _source 数组
   sources, err := Sourcelist(buf)
   if err != nil {
      return dbinfos, err
   }
         //fmt.Println(url)
         //fmt.Println(prama)
         //logPrint("url: ",url)
         //logPrint("url: ",reqJsonDSL)
         buf, err := EsReq("POST", url, []byte(reqJsonDSL))
         if err != nil {
            logPrint("EsReq: ",err)
            return
         }
   // 返回所有查询的数据
   dbpersoninfos := Parsesources(sources)
   return dbpersoninfos, nil
         // 返回 _source 数组
         sources, err := Sourcelistforscroll(buf)
         if err != nil {
            logPrint("EsReq: ",err)
            return
         }
         // 返回所有查询的数据
         ftmpDatas :=  Parsesources(sources["sourcelist"].([]map[string]interface{}))
         lock.Lock()
         dbinfos = append(dbinfos,ftmpDatas...)
         //logPrint("prsLen: ", len(Parsesources(sources["sourcelist"].([]map[string]interface{}))))
         //logPrint("dbinfosLen: ", len(dbinfos))
         lock.Unlock()
         scroll_id := sources["scroll_id"].(string)
         //scroll请求头
         scroll_url := "http://" + serverIp + ":" + serverPort + "/_search/scroll"
         for {
            next_scroll_id := ""
            if next_scroll_id != "" {
               scroll_id = next_scroll_id
            }
            jsonDSL := `{
            "scroll": "1m",
            "scroll_id" : "` + scroll_id + `"
        }`
            //fmt.Println(scroll_url)
            //fmt.Println(jsonDSL)
            buf, err := EsReq("POST", scroll_url, []byte(jsonDSL))
            if err != nil {
               //fmt.Println("lenth1: ", len(dbinfos))
               return
            }
            nextSources, err := Sourcelistforscroll(buf)
            if nextSources == nil {
               return
            }
            nextM := nextSources["sourcelist"].([]map[string]interface{})
            //fmt.Println("id",nextSources)
            if nextM == nil || len(nextM) == 0 {
               //fmt.Println("lenth: ", len(capturetable))
               return
            }
            tmpDatas := Parsesources(nextM)
            lock.Lock()
            dbinfos = append(dbinfos, tmpDatas...)
            //logPrint("tmpDatasLen: ", len(tmpDatas))
            //logPrint("AdbinfosLen: ", len(dbinfos))
            lock.Unlock()
            next_scroll_id = nextSources["scroll_id"].(string)
         }
      }(JsonDSL)
   }
   wg.Wait()
   fmt.Println("lenth_all: ", len(dbinfos))
   return dbinfos, nil
}
//************************CORN TASK*******************************