| | |
| | | //fmt.Println("tmpSource",sources) |
| | | return sources,nil |
| | | } |
| | | |
| | | //聚合任务列表,taskId+taskName |
| | | func AggregateTaskList(serverIp string, serverPort string, indexName string) (sources []map[string]interface{},err error){ |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | | "/"+indexName+"/_search" |
| | | DSLJson := `{ |
| | | "size": 0, |
| | | "aggs": { |
| | | "task_status": { |
| | | "composite": { |
| | | "sources": [ |
| | | { |
| | | "taskId": { |
| | | "terms": { |
| | | "field": "taskId" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "taskName": { |
| | | "terms": { |
| | | "field": "taskName.raw" |
| | | } |
| | | } |
| | | } |
| | | ], |
| | | "size":"1000" |
| | | } |
| | | } |
| | | } |
| | | }` |
| | | buf, err := EsReq("POST",url,[]byte(DSLJson)) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | var info interface{} |
| | | json.Unmarshal(buf, &info) |
| | | out, ok := info.(map[string]interface{}) |
| | | if !ok { |
| | | return nil, errors.New("http response interface can not change map[string]interface{}") |
| | | } |
| | | middle, ok := out["aggregations"].(map[string]interface{}) |
| | | if !ok { |
| | | return nil, errors.New("first hits change error!") |
| | | } |
| | | task_status, ok := middle["task_status"].(map[string]interface{}) |
| | | if !ok { |
| | | return nil, errors.New("first hits change error!") |
| | | } |
| | | |
| | | for _, in := range task_status["buckets"].([]interface{}){ |
| | | var source = make(map[string]interface{},0) |
| | | tmpbuf, ok := in.(map[string]interface{}) |
| | | if !ok { |
| | | fmt.Println("change to source error!") |
| | | continue |
| | | } |
| | | task := tmpbuf["key"].(map[string]interface{}) |
| | | count := int(tmpbuf["doc_count"].(float64)) |
| | | taskName := task["taskName"].(string) |
| | | taskId := task["taskId"].(string) |
| | | source["taskName"] = taskName |
| | | source["taskId"] = taskId |
| | | source["count"] = count |
| | | sources = append(sources, source) |
| | | } |
| | | //fmt.Println("tmpSource",sources) |
| | | return sources,nil |
| | | |
| | | } |