sunty
2020-05-28 01d848906751a17acaa33d7adbdabe41469bc347
upgrade getOceanFeature
2个文件已修改
108 ■■■■ 已修改文件
EsApi.go 106 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
EsClient.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
EsApi.go
@@ -767,10 +767,13 @@
}
// 查询底库人员信息*缓存*
func GetOceanFeatures(serverIp string, serverPort string, queryIndexNum int, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) {
    var dbinfos []*protomsg.MultiFeaCache
    point := strconv.Itoa(queryIndexNum)
    number := strconv.Itoa(queryNums)
func GetOceanFeatures(serverIp string, serverPort string, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) {
    //queryIndexNum int
    //var dbinfos []*protomsg.MultiFeaCache
    dbinfos := make([]*protomsg.MultiFeaCache,0)
    //dbinfosss := make([]*protomsg.MultiFeaCache,0)
    //dbinfoss = append(dbinfoss, dbinfosss...)
    JsonDSL := ""
    var source []string
    switch targetType {
@@ -779,9 +782,20 @@
    case "track":
        source = []string{"id", "targetInfo.feature", "analyServerId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.targetInfo.targetLocation"}
    }
    url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
    url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local;scroll=1m"
    var lock sync.RWMutex
    var wg sync.WaitGroup
    for i := 0; i < 48; i++ {
        //请求体
    JsonDSL = `  {
                    "from": ` + point + `,
                    "slice": {
                        "id": "` + strconv.Itoa(i) + `",
                        "max": 48
                    },
                    "size":` + strconv.Itoa(queryNums) + `,
                    "query": {
                        "bool": {
                            "filter": [
@@ -793,26 +807,86 @@
                            ]
                        }    
                    },
                     "size":` + number + `,
                     "_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"]
                    }`
        wg.Add(1)
        go func(reqJsonDSL string) {
            defer wg.Done()
    
    logPrint("url: ",url)
    logPrint("url: ",JsonDSL)
    buf, err := EsReq("POST", url, []byte(JsonDSL))
            //fmt.Println(url)
            //fmt.Println(prama)
            //logPrint("url: ",url)
            //logPrint("url: ",reqJsonDSL)
            buf, err := EsReq("POST", url, []byte(reqJsonDSL))
    if err != nil {
        return dbinfos, err
                logPrint("EsReq: ",err)
                return
    }
    // 返回 _source 数组
    sources, err := Sourcelist(buf)
            sources, err := Sourcelistforscroll(buf)
    if err != nil {
        return dbinfos, err
                logPrint("EsReq: ",err)
                return
            }
            // 返回所有查询的数据
            ftmpDatas :=  Parsesources(sources["sourcelist"].([]map[string]interface{}))
            lock.Lock()
            dbinfos = append(dbinfos,ftmpDatas...)
            //logPrint("prsLen: ", len(Parsesources(sources["sourcelist"].([]map[string]interface{}))))
            //logPrint("dbinfosLen: ", len(dbinfos))
            lock.Unlock()
            scroll_id := sources["scroll_id"].(string)
            //scroll请求头
            scroll_url := "http://" + serverIp + ":" + serverPort + "/_search/scroll"
            for {
                next_scroll_id := ""
                if next_scroll_id != "" {
                    scroll_id = next_scroll_id
                }
                jsonDSL := `{
            "scroll": "1m",
            "scroll_id" : "` + scroll_id + `"
        }`
                //fmt.Println(scroll_url)
                //fmt.Println(jsonDSL)
                buf, err := EsReq("POST", scroll_url, []byte(jsonDSL))
                if err != nil {
                    //fmt.Println("lenth1: ", len(dbinfos))
                    return
                }
                nextSources, err := Sourcelistforscroll(buf)
                if nextSources == nil {
                    return
    }
    // 返回所有查询的数据
    dbpersoninfos := Parsesources(sources)
    return dbpersoninfos, nil
                nextM := nextSources["sourcelist"].([]map[string]interface{})
                //fmt.Println("id",nextSources)
                if nextM == nil || len(nextM) == 0 {
                    //fmt.Println("lenth: ", len(capturetable))
                    return
                }
                tmpDatas := Parsesources(nextM)
                lock.Lock()
                dbinfos = append(dbinfos, tmpDatas...)
                //logPrint("tmpDatasLen: ", len(tmpDatas))
                //logPrint("AdbinfosLen: ", len(dbinfos))
                lock.Unlock()
                next_scroll_id = nextSources["scroll_id"].(string)
            }
        }(JsonDSL)
    }
    wg.Wait()
    fmt.Println("lenth_all: ", len(dbinfos))
    return dbinfos, nil
}
//************************CORN TASK*******************************
EsClient.go
@@ -611,7 +611,7 @@
func EsReq(method string, url string, parama []byte) (buf []byte, err error) {
    //defer elapsed("page")()
    timeout := time.Duration(10 * time.Second)
    timeout := time.Duration(100 * time.Second)
    client := http.Client{
        Timeout: timeout,
    }