liuxiaolong
2019-10-26 86655db5ef2cca9d7cf4eedae879c3a7c4464b9f
EsApi.go
@@ -327,12 +327,12 @@
   //使用es底层机制处理分页
   analyServerFilterStr := ""
   analyServerId := compareArgs.AnalyServerId
   if analyServerId == "" {
      fmt.Println("no analyServerId")
      return
   if analyServerId != "" {
      analyServerFilterStr = "{\"term\":{\"analyServerId\":\"" + analyServerId + "\"}},"
   }
   analyServerFilterStr := "{\"term\":{\"analyServerId\":\"" + analyServerId + "\"}},"
   //首次请求头
   url := "http://" + serverIp + ":" + serverPort +
@@ -688,3 +688,73 @@
   //fmt.Println("tmpSource",sources)
   return sources,nil
}
//聚合任务列表,taskId+taskName
func AggregateTaskList(serverIp string, serverPort string, indexName string) (sources []map[string]interface{},err error){
   url := "http://" + serverIp + ":" + serverPort +
      "/"+indexName+"/_search"
   DSLJson := `{
    "size": 0,
    "aggs": {
        "task_status": {
            "composite": {
                "sources": [
                    {
                        "taskId": {
                            "terms": {
                                "field": "taskId"
                            }
                        }
                    },
                    {
                        "taskName": {
                            "terms": {
                                "field": "taskName.raw"
                            }
                        }
                    }
                ],
                "size":"1000"
            }
        }
    }
}`
   buf, err := EsReq("POST",url,[]byte(DSLJson))
   if err != nil {
      return nil, err
   }
   var info interface{}
   json.Unmarshal(buf, &info)
   out, ok := info.(map[string]interface{})
   if !ok {
      return nil, errors.New("http response interface can not change map[string]interface{}")
   }
   middle, ok := out["aggregations"].(map[string]interface{})
   if !ok {
      return nil, errors.New("first hits change error!")
   }
   task_status, ok := middle["task_status"].(map[string]interface{})
   if !ok {
      return nil, errors.New("first hits change error!")
   }
   for _, in := range task_status["buckets"].([]interface{}){
      var source = make(map[string]interface{},0)
      tmpbuf, ok := in.(map[string]interface{})
      if !ok {
         fmt.Println("change to source error!")
         continue
      }
      task := tmpbuf["key"].(map[string]interface{})
      count := int(tmpbuf["doc_count"].(float64))
      taskName := task["taskName"].(string)
      taskId := task["taskId"].(string)
      source["taskName"] = taskName
      source["taskId"] = taskId
      source["count"] = count
      sources = append(sources, source)
   }
   //fmt.Println("tmpSource",sources)
   return sources,nil
}