| | |
| | | } |
| | | |
| | | // 查询底库人员信息*缓存* |
| | | func GetOceanFeatures(serverIp string, serverPort string, queryIndexNum int, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) { |
| | | var dbinfos []*protomsg.MultiFeaCache |
| | | point := strconv.Itoa(queryIndexNum) |
| | | number := strconv.Itoa(queryNums) |
| | | func GetOceanFeatures(serverIp string, serverPort string, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) { |
| | | //queryIndexNum int |
| | | //var dbinfos []*protomsg.MultiFeaCache |
| | | dbinfos := make([]*protomsg.MultiFeaCache,0) |
| | | //dbinfosss := make([]*protomsg.MultiFeaCache,0) |
| | | //dbinfoss = append(dbinfoss, dbinfosss...) |
| | | |
| | | JsonDSL := "" |
| | | var source []string |
| | | switch targetType { |
| | |
| | | case "track": |
| | | source = []string{"id", "targetInfo.feature", "analyServerId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.targetInfo.targetLocation"} |
| | | } |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local" |
| | | JsonDSL = ` { |
| | | "from": ` + point + `, |
| | | |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local;scroll=1m" |
| | | |
| | | var lock sync.RWMutex |
| | | var wg sync.WaitGroup |
| | | |
| | | for i := 0; i < 48; i++ { |
| | | //请求体 |
| | | JsonDSL = ` { |
| | | "slice": { |
| | | "id": "` + strconv.Itoa(i) + `", |
| | | "max": 48 |
| | | }, |
| | | "size":` + strconv.Itoa(queryNums) + `, |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | |
| | | ] |
| | | } |
| | | }, |
| | | "size":` + number + `, |
| | | "_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"] |
| | | }` |
| | | |
| | | logPrint("url: ",url) |
| | | logPrint("url: ",JsonDSL) |
| | | buf, err := EsReq("POST", url, []byte(JsonDSL)) |
| | | if err != nil { |
| | | return dbinfos, err |
| | | } |
| | | wg.Add(1) |
| | | go func(reqJsonDSL string) { |
| | | defer wg.Done() |
| | | |
| | | // 返回 _source 数组 |
| | | sources, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return dbinfos, err |
| | | } |
| | | //fmt.Println(url) |
| | | //fmt.Println(prama) |
| | | //logPrint("url: ",url) |
| | | //logPrint("url: ",reqJsonDSL) |
| | | buf, err := EsReq("POST", url, []byte(reqJsonDSL)) |
| | | if err != nil { |
| | | logPrint("EsReq: ",err) |
| | | return |
| | | } |
| | | |
| | | // 返回所有查询的数据 |
| | | dbpersoninfos := Parsesources(sources) |
| | | return dbpersoninfos, nil |
| | | // 返回 _source 数组 |
| | | sources, err := Sourcelistforscroll(buf) |
| | | if err != nil { |
| | | logPrint("EsReq: ",err) |
| | | return |
| | | } |
| | | // 返回所有查询的数据 |
| | | ftmpDatas := Parsesources(sources["sourcelist"].([]map[string]interface{})) |
| | | lock.Lock() |
| | | dbinfos = append(dbinfos,ftmpDatas...) |
| | | //logPrint("prsLen: ", len(Parsesources(sources["sourcelist"].([]map[string]interface{})))) |
| | | //logPrint("dbinfosLen: ", len(dbinfos)) |
| | | lock.Unlock() |
| | | |
| | | scroll_id := sources["scroll_id"].(string) |
| | | |
| | | //scroll请求头 |
| | | scroll_url := "http://" + serverIp + ":" + serverPort + "/_search/scroll" |
| | | for { |
| | | next_scroll_id := "" |
| | | if next_scroll_id != "" { |
| | | scroll_id = next_scroll_id |
| | | } |
| | | jsonDSL := `{ |
| | | "scroll": "1m", |
| | | "scroll_id" : "` + scroll_id + `" |
| | | }` |
| | | //fmt.Println(scroll_url) |
| | | //fmt.Println(jsonDSL) |
| | | buf, err := EsReq("POST", scroll_url, []byte(jsonDSL)) |
| | | |
| | | if err != nil { |
| | | //fmt.Println("lenth1: ", len(dbinfos)) |
| | | return |
| | | } |
| | | nextSources, err := Sourcelistforscroll(buf) |
| | | |
| | | if nextSources == nil { |
| | | return |
| | | } |
| | | |
| | | nextM := nextSources["sourcelist"].([]map[string]interface{}) |
| | | //fmt.Println("id",nextSources) |
| | | if nextM == nil || len(nextM) == 0 { |
| | | //fmt.Println("lenth: ", len(capturetable)) |
| | | return |
| | | } |
| | | tmpDatas := Parsesources(nextM) |
| | | lock.Lock() |
| | | dbinfos = append(dbinfos, tmpDatas...) |
| | | //logPrint("tmpDatasLen: ", len(tmpDatas)) |
| | | //logPrint("AdbinfosLen: ", len(dbinfos)) |
| | | lock.Unlock() |
| | | |
| | | next_scroll_id = nextSources["scroll_id"].(string) |
| | | } |
| | | |
| | | }(JsonDSL) |
| | | } |
| | | wg.Wait() |
| | | |
| | | fmt.Println("lenth_all: ", len(dbinfos)) |
| | | |
| | | return dbinfos, nil |
| | | } |
| | | |
| | | //************************CORN TASK******************************* |