sunty
2019-10-22 fc21cb1a45c5ca831a2221752da9891cf0291395
aggs task info
1个文件已修改
70 ■■■■■ 已修改文件
EsApi.go 70 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
EsApi.go
@@ -688,3 +688,73 @@
    //fmt.Println("tmpSource",sources)
    return sources,nil
}
//聚合任务列表,taskId+taskName
func AggregateTaskList(serverIp string, serverPort string, indexName string) (sources []map[string]interface{},err error){
    url := "http://" + serverIp + ":" + serverPort +
        "/"+indexName+"/_search"
    DSLJson := `{
    "size": 0,
    "aggs": {
        "task_status": {
            "composite": {
                "sources": [
                    {
                        "taskId": {
                            "terms": {
                                "field": "taskId"
                            }
                        }
                    },
                    {
                        "taskName": {
                            "terms": {
                                "field": "taskName.raw"
                            }
                        }
                    }
                ],
                "size":"1000"
            }
        }
    }
}`
    buf, err := EsReq("POST",url,[]byte(DSLJson))
    if err != nil {
        return nil, err
    }
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return nil, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["aggregations"].(map[string]interface{})
    if !ok {
        return nil, errors.New("first hits change error!")
    }
    task_status, ok := middle["task_status"].(map[string]interface{})
    if !ok {
        return nil, errors.New("first hits change error!")
    }
    for _, in := range task_status["buckets"].([]interface{}){
        var source = make(map[string]interface{},0)
        tmpbuf, ok := in.(map[string]interface{})
        if !ok {
            fmt.Println("change to source error!")
            continue
        }
        task := tmpbuf["key"].(map[string]interface{})
        count := int(tmpbuf["doc_count"].(float64))
        taskName := task["taskName"].(string)
        taskId := task["taskId"].(string)
        source["taskName"] = taskName
        source["taskId"] = taskId
        source["count"] = count
        sources = append(sources, source)
    }
    //fmt.Println("tmpSource",sources)
    return sources,nil
}