sunty
2019-11-21 c9296a4e0869aa9b4a98df7d0433ef0d6b9a0b10
EsApi.go
@@ -1,110 +1,20 @@
package esutil
import (
    "encoding/json"
    "errors"
    "fmt"
    "strconv"
    "strings"
   "encoding/json"
   "errors"
   "fmt"
   "strconv"
   "strings"
   "sync"
   "time"
   "basic.com/pubsub/protomsg.git"
        )
// 查询底库人员信息
func Personinfos( queryIndex int, queryNums int, indexName string, serverIp string, serverPort string, analyServerId string) ([]*protomsg.Esinfo, error){
            var dbinfos []*protomsg.Esinfo
            point   := strconv.Itoa(queryIndex)
            number   := strconv.Itoa(queryNums)
            JsonDSL  := ""
            if indexName == "videopersons" {
                 JsonDSL =  `  {
                    "from": ` + point +  `,
                    "query": {
                        "bool": {
                            "filter": [
                                {
                                    "term": {
                                        "analyServerId": "` + analyServerId + `"
                                    }
                                }
                            ]
                        }
                    },
                    "size":`+ number +`,
                    "_source": [
                        "id",
                        "faceFeature"
                    ]
                }`
            }else {
                 JsonDSL =  `  {
                    "from": ` + point +  `,
                    "query": {
                        "match_all": {}
                    },
                     "size":`+ number +`,
                     "_source": [
                       "id",
                       "tableId",
                       "faceFeature"
                       ]
                    }`
            }
            //fmt.Println("url: "+"http://"+serverIp+":"+serverPort+"/"+indexName+"/_search","body: ",JsonDSL)
    buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(JsonDSL))
    if err != nil {
        return dbinfos ,errors.New("http request dbtablename info is err!")
    }
      // 返回 _source 数组
      sources, err := Sourcelist(buf)
      if err != nil {
            return dbinfos,err
      }
      // 返回所有查询的数据
     dbpersoninfos :=  Parsesources(sources)
      return dbpersoninfos, nil
}
// 根据底库id查询底库信息
func Dbtablefosbyid (id []string, indexName string, serverIp string, serverPort string) ([]protomsg.Dbtable, error) {
    var dbinfo []protomsg.Dbtable
    dbtableId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1)
        var dbinfoRequest = `
            {
               "query": {
                  "bool": {
                     "filter": [{
                        "terms": {
                           "id": [
                              "`+ dbtableId +`"
                           ]
                        }
                     }]
                  }
               },
                "size":1000000
            }
        `
    buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfoRequest))
    if err != nil {
        return dbinfo,  err
    }
    sources, err := Sourcelist(buf)
    if err != nil {
        return dbinfo , err
    }
    dbtable := Dbtablebyid(sources)
    return dbtable, nil
}
)
// 根据抓拍人员id查询抓拍人员信息
func Videopersonsinfosbyid (id []string, indexName string, serverIp string, serverPort string) ([]protomsg.Videopersons, error) {
   var videopersonsInfo []protomsg.Videopersons
func AIOceaninfosbyid(id []string, indexName string, serverIp string, serverPort string) ([]protomsg.AIOcean, error) {
   var aIOceanInfo []protomsg.AIOcean
   videopersonsPersonId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1)
   var dbinfoRequest = `
            {
@@ -113,7 +23,7 @@
                     "filter": [{
                        "terms": {
                           "id": [
                              "`+ videopersonsPersonId +`"
                              "` + videopersonsPersonId + `"
                           ]
                        }
                     }]
@@ -122,177 +32,114 @@
                "size":1000000
            }
        `
   buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfoRequest))
   buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search", []byte(dbinfoRequest))
   if err != nil {
      return videopersonsInfo,  err
      return aIOceanInfo, err
   }
   sources, err := Sourcelist(buf)
   if err != nil {
      return videopersonsInfo , err
      return aIOceanInfo, err
   }
   videoperson := Videopersonsbyid(sources)
   println(videoperson)
   return videoperson,nil
   aIOcean := AIOceanAnalysis(sources)
   println(aIOcean)
   return aIOcean, nil
}
// 根据底库人员id查询底库人员信息
func Dbpersoninfosbyid (id []string, indexName string, serverIp string, serverPort string) ([]protomsg.Dbperson, error) {
    var dbinfo []protomsg.Dbperson
    dbtablePersonId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1)
    var dbinfoRequest = `
            {
               "query": {
                  "bool": {
                     "filter": [{
                        "terms": {
                           "id": [
                              "`+ dbtablePersonId +`"
                           ]
                        }
                     }]
                  }
               },
                "size":1000000
            }
        `
    buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfoRequest))
    if err != nil {
        return dbinfo,  err
    }
    sources, err := Sourcelist(buf)
    if err != nil {
        return dbinfo , err
    }
    dbperson := Dbpersonbyid(sources)
    println(dbperson)
    return dbperson,nil
}
//根据抓拍库人员id查询特征值
func GetVideoPersonFaceFeatureById (id string, indexName string, serverIp string, serverPort string) (string, error) {
    var jsonDSL = `
func GetVideoPersonFaceFeatureById(id string, indexName string, serverIp string, serverPort string) (string, error) {
   var jsonDSL = `
            {
               "query": {
                  "bool": {
                     "filter": [{
                        "term": {
                           "id":"`+ id +`"
                           "id":"` + id + `"
                        }
                     }]
                  }
               },
                "_source":["faceFeature"]
                "_source":["targetInfo.feature"]
            }
        `
    buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(jsonDSL))
    if err != nil {
        return "",  err
    }
   buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search", []byte(jsonDSL))
   if err != nil {
      return "", err
   }
    sources, err := Sourcelist(buf)
    if err != nil {
        return "" , err
    }
    faceFeature := sources[0]["faceFeature"].(string)
    return faceFeature,nil
   sources, err := Sourcelist(buf)
   if err != nil {
      return "", err
   }
   feature := sources[0]["targetInfo"].([]interface{})[0].(map[string]interface{})["feature"].(string)
   return feature, nil
}
// 根据tableid 查询tablename
func Dbtablename(tableid string, indexName string, serverIp string, serverPort string) (tablename string, err error) {
    var dbinfotable =` {
        "query": {
            "bool": {
                "filter": [
                {
                    "term": {
                        "id":"`+tableid+`"
                    }
                }
                ]
            }
        },
        "_source": [
            "tableName"
        ],
        "size":1000000
    }
    `
    buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfotable))
    if err != nil {
        return "" ,errors.New("http request dbtablename info is err!")
    }
    sources, err := Sourcelist(buf)
    if err != nil {
          return "",err
    }
    for _, source := range sources {
        if name, ok := source["tableName"].(string); ok {
            tablename = name
            break
        }
    }
    return tablename, nil
}
//根据抓拍人员id更新(videourl)摄像机地址
func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string)(statu int){
    var info interface{}
    var videoUrlInfo = `
func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int) {
   var info interface{}
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query?refresh=true"
   sourceStr := "ctx._source.videoUrl='" + videoUrl + "'"
   if command >= 0 {
      sourceStr = "ctx._source.linkTagInfo[" + strconv.Itoa(command) + "].videoUrl='" + videoUrl + "'"
   }
   var videoUrlInfo = `
        {
          "script": {
            "source": "ctx._source.videoUrl='` + videoUrl + `'"
            "source": "` + sourceStr + `"
          },
          "query": {
            "term": {
              "id": "` +id+ `"
              "id": "` + id + `"
            }
          },
            "size":1000000
          }
        }
        `
    buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_update_by_query",[]byte(videoUrlInfo))
    if err != nil {
        fmt.Println("http request videoUrlInfo info is err!")
        statu = 500
        return
    }
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        fmt.Println("http response interface can not change map[string]interface{}")
        statu = 500
        return
    }
    middle, ok := out["updated"].(float64)
    if !ok {
        fmt.Println("first updated change error!")
        statu = 500
        return
    }
    if middle == 1{
        statu = 200
        return
    }
    if middle == 0{
        statu = 201
        return
    }
    return statu
   //fmt.Println("url: ", url, videoUrlInfo)
   buf, err := EsReq("POST", url, []byte(videoUrlInfo))
   if err != nil {
      fmt.Println("http request videoUrlInfo info is err!")
      statu = 500
      return
   }
   json.Unmarshal(buf, &info)
   //fmt.Println(info)
   out, ok := info.(map[string]interface{})
   if !ok {
      fmt.Println("http response interface can not change map[string]interface{}")
      statu = 500
      return
   }
   middle, ok := out["updated"].(float64)
   if !ok {
      fmt.Println("first updated change error!")
      statu = 500
      return
   }
   if middle == 1 {
      statu = 200
      return
   }
   if middle == 0 {
      statu = 201
      return
   }
   return statu
}
//获取当前节点抓拍库所有人员ID
func GetAllLocalVideopersonsId(compareArgs protomsg.CompareArgs, indexName string, serverIp string, serverPort string) (capturetable []string) {
func GetAllLocalVideopersonsId(compareArgs protomsg.CompareArgs, indexName string, serverIp string, serverPort string, alarmLevelTypes string) (capturetable []string) {
   queryStr := ""
   queryBody := compareArgs.InputValue
   //检索框
   if queryBody != "" {
      queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"alarmRules.alarmLevel^1.5\",\"ageDescription^1.5\",\"taskName^1.5\",\"baseInfo.tableName^1.5\",\"sex^2.0\",\"race^2.0\",\"content^1.0\",\"baseInfo.idCard^1.8\",\"cameraAddr^1.0\"]," +
      queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"cameraAddr^1.5\",\"taskName^1.5\",\"sdkName^1.5\",\"showLabels^3.0\",\"baseInfo.tableName^1.5\",\"baseInfo.targetName^1.5\",\"baseInfo.labels^1.5\",\"alarmRules.alarmLevel^1.5\",\"linkTag^1.5\"]," +
         "\"type\":\"cross_fields\",\"operator\":\"OR\",\"slop\":0,\"prefix_length\":0,\"max_expansions\":50,\"zero_terms_query\":\"NONE\",\"auto_generate_synonyms_phrase_query\":true,\"fuzzy_transpositions\":true,\"boost\":1}}],"
   }
   if compareArgs.SearchTime == nil || len(compareArgs.SearchTime) != 2 {
      return nil
   }
   gteDate := compareArgs.SearchTime[0]
   lteDate := compareArgs.SearchTime[1]
@@ -325,15 +172,21 @@
      isCollectStr = "{\"term\":{\"isCollect\":\"" + isCollect + "\"}},"
   }
   //判断布防等级
   alarmLevelStr := ""
   if alarmLevelTypes != "" {
      alarmLevelStr = "{\"terms\":{\"alarmRules.alarmLevel.raw\":[\"" + alarmLevelTypes + "\"]}},"
   }
   //使用es底层机制处理分页
   analyServerFilterStr := ""
   analyServerId := compareArgs.AnalyServerId
   if analyServerId == "" {
      fmt.Println("no analyServerId")
      return
   if analyServerId != "" {
      analyServerFilterStr = "{\"term\":{\"analyServerId\":\"" + analyServerId + "\"}},"
   }
   analyServerFilterStr := "{\"term\":{\"analyServerId\":\"" + analyServerId + "\"}},"
   ts := time.Now()
   //首次请求头
   url := "http://" + serverIp + ":" + serverPort +
      "/" + indexName + "/_search?search_type=dfs_query_then_fetch;scroll=1m"
@@ -341,14 +194,16 @@
   var lock sync.RWMutex
   var wg sync.WaitGroup
   for i := 0; i < 32; i++ {
   for i := 0; i < 48; i++ {
      //请求体
      prama := "{" +
         "\"slice\":{\"id\":" + strconv.Itoa(i) + ",\"max\":48}," +
         "\"size\":\"1000\"," +
         "\"query\":{\"bool\":{" + queryStr +
         "\"filter\":[" +
         "{\"term\":{\"targetInfo.targetType.raw\":\"face\"}}," +
         cameraIdStr +
         alarmLevelStr +
         taskIdStr +
         isCollectStr +
         esTableIdStr +
@@ -357,12 +212,12 @@
         "\"_source\":[\"id\"]" +
         "}"
      wg.Add(1)
      go func() {
      go func(reqParam string) {
         defer wg.Done()
         //fmt.Println(url)
         //fmt.Println(prama)
         buf, err := EsReq("POST", url, []byte(prama))
         buf, err := EsReq("POST", url, []byte(reqParam))
         if err != nil {
            fmt.Println("http request videoUrlInfo info is err!")
@@ -377,7 +232,9 @@
            return
         }
         for _, source := range sources["sourcelist"].([]map[string]interface{}) {
            lock.Lock()
            capturetable = append(capturetable, source["id"].(string))
            lock.Unlock()
         }
         scroll_id := sources["scroll_id"].(string)
@@ -427,128 +284,69 @@
            next_scroll_id = nextSources["scroll_id"].(string)
         }
         fmt.Println(len(capturetable))
      }()
      }(prama)
   }
   wg.Wait()
   //fmt.Println("lenth_all: ", len(capturetable))
   //fmt.Println("耗时:", time.Since(ts))
   fmt.Println("lenth_all: ", len(capturetable))
   fmt.Println("耗时:", time.Since(ts))
   return capturetable
}
//获取底库人员ID
func GetDbpersonsId(compareArgs  protomsg.CompareArgs,indexName string, serverIp string, serverPort string) (source map[string][]string) {
    queryStr := ""
    queryBody := compareArgs.InputValue
    //检索框
    if queryBody != "" {
        queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"personName^1.5\",\"age^1.5\",\"idCard^1.5\",\"phoneNum^1.5\",\"sex^2.0\",\"reserved^2.0\"]," +
            "\"type\":\"cross_fields\",\"operator\":\"OR\",\"slop\":0,\"prefix_length\":0,\"max_expansions\":50,\"zero_terms_query\":\"NONE\",\"auto_generate_synonyms_phrase_query\":true,\"fuzzy_transpositions\":true,\"boost\":1}}],"
    }
    //判断库表ID
    tableId := compareArgs.Tabs
    esTableId := ""
    esTableIdStr := ""
    if tableId != nil && len(tableId) > 0 {
        esTableId = strings.Replace(strings.Trim(fmt.Sprint(tableId), "[]"), " ", "\",\"", -1)
        esTableIdStr = "{\"terms\":{\"tableId\":[\"" + esTableId + "\"]}}"
    }
    prama := "{" +
        "\"size\":\"100000000\"," +
        "\"query\":{\"bool\":{" + queryStr +
        "\"filter\":[" +
        esTableIdStr +
        "]}}," +
        "\"_source\":[\"id\",\"tableId\"]" +
        "}"
    url := "http://" + serverIp + ":" + serverPort +
        "/" + indexName + "/_search?search_type=dfs_query_then_fetch"
    fmt.Println(url)
    fmt.Println(prama)
    buf, err := EsReq("POST", url,[]byte(prama))
    if err != nil {
        fmt.Println("http request videoUrlInfo info is err!")
        return
    }
    sources, err := Sourcelist(buf)
    if err != nil {
        return
    }
    tabsource := make(map[string][]string)
    for _, source := range  sources{
        tableId := source["tableId"].(string)
        id := source["id"].(string)
        tabsource[tableId] = append(tabsource[tableId], id)
    }
    return tabsource
}
//初始化实时抓拍
func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool   ,quantity int) ([]protomsg.Videopersons, error){
   var videopersonsInfo []protomsg.Videopersons
func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool, category string, quantity int) ([]protomsg.AIOcean, error) {
   var aIOceanInfo []protomsg.AIOcean
   url := "http://" + serverIp + ":" + serverPort +
      "/" + indexName + "/_search"
   queryStr := ""
   if isAlarm == true {
      queryStr = `"query":{
      "match_all":{}
   },`
   } else {
      queryStr = `"query":{
   categoryStr := ""
   if category != "" {
      categoryStr = `            {
               "term":{
                  "targetInfo.targetType":"` + category + `"
               }
            },`
   }
   queryStr := `"query":{
      "bool":{
         "filter":[
            ` + categoryStr + `
            {
               "term":{
                  "isAlarm":1
                  "isAlarm":` + strconv.FormatBool(isAlarm) + `
               }
            }
         ]
      }
   },`
   }
   DSLJson := `{
   "size":`+strconv.Itoa(quantity)+`,
   `+queryStr+`
   "size":` + strconv.Itoa(quantity) + `,
   ` + queryStr + `
   "sort":[{"picDate":{"order":"desc"}}],
   "_source": ["baseInfo", "alarmRules", "sex", "analyServerName", "sdkName", "ageDescription", "content", "id", "cameraAddr", "picMaxUrl", "picDate", "race", "videoUrl", "picSmUrl", "taskName", "personIsHub", "isAlarm", "analyServerIp", "cameraId"]
   "_source": {"includes":[],"excludes":["*.feature"]}
   }`
   fmt.Println(DSLJson)
   buf, err := EsReq("POST", url, []byte(DSLJson))
   if err != nil {
      return videopersonsInfo, err
      return aIOceanInfo, err
   }
   sources, err := Sourcelist(buf)
   if err != nil {
      return videopersonsInfo, err
      return aIOceanInfo, err
   }
   videoperson := Videopersonsbyid(sources)
   aIOcean := AIOceanAnalysis(sources)
   //fmt.Println(len(videoperson))
   return videoperson, nil
   return aIOcean, nil
}
//实时抓拍
func RealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool   ) ([]protomsg.Videopersons, error){
   var videopersonsInfo []protomsg.Videopersons
func RealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool) ([]protomsg.AIOcean, error) {
   var aIOceanInfo []protomsg.AIOcean
   url := "http://" + serverIp + ":" + serverPort +
      "/" + indexName + "/_search"
   queryStr := ""
   if isAlarm == true {
      fmt.Println("continue")
   } else {
      queryStr = `
            {
               "term":{
                  "isAlarm":1
               }
            }
               `
   }
   DSLJson := `{
   "size":20,
   "query":{
@@ -562,30 +360,34 @@
                  }      
               }
            },
         `+queryStr+`
            {
               "term":{
                  "isAlarm":` + strconv.FormatBool(isAlarm) + `
               }
            }
         ]
      }
   },
   "_source": ["baseInfo", "alarmRules", "sex", "analyServerName", "sdkName", "ageDescription", "content", "id", "cameraAddr", "picMaxUrl", "picDate", "race", "videoUrl", "picSmUrl", "taskName", "personIsHub", "isAlarm", "analyServerIp", "cameraId", "isAckAlarm"]
   "_source": {"includes":[],"excludes":["*.feature"]}
   }`
   buf, err := EsReq("POST", url, []byte(DSLJson))
   if err != nil {
      return videopersonsInfo, err
      return aIOceanInfo, err
   }
   sources, err := Sourcelist(buf)
   if err != nil {
      return videopersonsInfo, err
      return aIOceanInfo, err
   }
   videoperson := Videopersonsbyid(sources)
   fmt.Println(len(videoperson))
   return videoperson, nil
   aIOcean := AIOceanAnalysis(sources)
   fmt.Println(len(aIOcean))
   return aIOcean, nil
}
//综合统计
func StatisticsComprehensive(serverIp string, serverPort string, indexName string) (total int, err error){
func StatisticsComprehensive(serverIp string, serverPort string, indexName string, isAlarm bool) (total int, err error) {
   url := "http://" + serverIp + ":" + serverPort +
      "/" + indexName + "/_search"
   DSLJson := `{
@@ -595,14 +397,21 @@
         "filter":[{
            "range":{
               "picDate":{
                  "gte":"now+8H/d"
                  "gte":"now+8h/d"
                  }
               }
            }]
            },
            {
               "term":{
                  "isAlarm":` + strconv.FormatBool(isAlarm) + `
               }
            }
            ]
         }
      }
   }`
   buf, err := EsReq("POST",url,[]byte(DSLJson))
   //fmt.Println(DSLJson)
   buf, err := EsReq("POST", url, []byte(DSLJson))
   if err != nil {
      return total, err
   }
@@ -618,12 +427,13 @@
   }
   total = int(middle["total"].(float64))
   //fmt.Println(total)
   return total,nil
   return total, nil
}
//实时报警任务比率
func RealTimeAlarmTaskRate(serverIp string, serverPort string, indexName string) (sources []map[string]interface{},err error){
func RealTimeAlarmTaskRate(serverIp string, serverPort string, indexName string) (sources []map[string]interface{}, err error) {
   url := "http://" + serverIp + ":" + serverPort +
      "/"+indexName+"/_search"
      "/" + indexName + "/_search"
   DSLJson := `{
   "size":0,
   "query":{
@@ -638,14 +448,14 @@
      }
   },
   "aggs":{
      "sdkName_status":{
      "sdkId_status":{
         "terms":{
            "field":"taskName.raw"
            "field":"taskId"
         }
      }
   }
}`
   buf, err := EsReq("POST",url,[]byte(DSLJson))
   buf, err := EsReq("POST", url, []byte(DSLJson))
   if err != nil {
      return nil, err
   }
@@ -659,24 +469,243 @@
   if !ok {
      return nil, errors.New("first hits change error!")
   }
   sdkName_status, ok := middle["sdkName_status"].(map[string]interface{})
   sdkName_status, ok := middle["sdkId_status"].(map[string]interface{})
   if !ok {
      return nil, errors.New("first hits change error!")
   }
   for _, in := range sdkName_status["buckets"].([]interface{}){
      var source = make(map[string]interface{},0)
   for _, in := range sdkName_status["buckets"].([]interface{}) {
      var source = make(map[string]interface{}, 0)
      tmpbuf, ok := in.(map[string]interface{})
      if !ok {
         fmt.Println("change to source error!")
         continue
      }
      sdkName := tmpbuf["key"].(string)
      sdkId := tmpbuf["key"].(string)
      count := int(tmpbuf["doc_count"].(float64))
      source["name"] = sdkName
      source["id"] = sdkId
      source["value"] = count
      sources = append(sources, source)
   }
   //fmt.Println("tmpSource",sources)
   return sources,nil
   return sources, nil
}
//聚合任务列表,taskId+taskName
func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string) (sources []map[string]interface{}, err error) {
   url := "http://" + serverIp + ":" + serverPort +
      "/" + indexName + "/_search"
   serverFilterStr := ""
   if analyServerId != "" {
      serverFilterStr = `,
         "query": {
         "bool": {
            "filter": [
            {
            "term": {
            "analyServerId": "` + analyServerId + `"
            }
            }
         ]
         }
      }`
   }
   DSLJson := `{
    "size": 0,
    "aggs": {
        "task_status": {
            "composite": {
                "sources": [
                    {
                        "taskId": {
                            "terms": {
                                "field": "taskId"
                            }
                        }
                    },
                    {
                        "taskName": {
                            "terms": {
                                "field": "taskName.raw"
                            }
                        }
                    }
                ],
                "size":"1000000"
            }
        }
    }
   ` + serverFilterStr + `
}`
   buf, err := EsReq("POST", url, []byte(DSLJson))
   if err != nil {
      return nil, err
   }
   var info interface{}
   json.Unmarshal(buf, &info)
   out, ok := info.(map[string]interface{})
   if !ok {
      return nil, errors.New("http response interface can not change map[string]interface{}")
   }
   middle, ok := out["aggregations"].(map[string]interface{})
   if !ok {
      return nil, errors.New("first hits change error!")
   }
   task_status, ok := middle["task_status"].(map[string]interface{})
   if !ok {
      return nil, errors.New("first hits change error!")
   }
   for _, in := range task_status["buckets"].([]interface{}) {
      var source = make(map[string]interface{}, 0)
      tmpbuf, ok := in.(map[string]interface{})
      if !ok {
         fmt.Println("change to source error!")
         continue
      }
      task := tmpbuf["key"].(map[string]interface{})
      count := int(tmpbuf["doc_count"].(float64))
      taskName := task["taskName"].(string)
      taskId := task["taskId"].(string)
      source["taskName"] = taskName
      source["taskId"] = taskId
      source["count"] = count
      sources = append(sources, source)
   }
   //fmt.Println("tmpSource",sources)
   return sources, nil
}
/****************************************以下为sdkCompare比对缓存使用方法*********************************************/
//获取查询总数
func GetTotal(serverIp string, serverPort string, indexName string, shards string) (total int) {
   JsonDSL := `{
   "size": 0,
   "query": {
      "bool": {
         "filter": [{
            "term": {
               "targetInfo.targetType.raw": "face"
            }
         }]
      }
   }
}`
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
   buf, err := EsReq("POST", url, []byte(JsonDSL))
   if err != nil {
      return
   }
   var info interface{}
   json.Unmarshal(buf, &info)
   out, ok := info.(map[string]interface{})
   if !ok {
      return
   }
   middle, ok := out["hits"].(map[string]interface{})
   if !ok {
      return
   }
   total = int(middle["total"].(float64))
   return total
}
//查询时间段数据
func GetPeriodInfos(serverIp string, serverPort string, startTime string, endTime string, indexName string, shards string) ([]*protomsg.Esinfo, error) {
   var capdbinfo []*protomsg.Esinfo
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
   JsonDSL := `
            {
                "query": {
                    "bool": {
                        "filter": [
                            {
                                "term": {
                                    "targetInfo.targetType.raw": "face"
                                }
                            },
                            {
                                "range": {
                                    "picDate": {
                                        "gte": "` + startTime + `",
                                        "lt": "` + endTime + `"
                                    }
                                }
                            }
                        ]
                    }
                },
                "size": 1000000,
                "_source": [
                    "id",
                    "targetInfo.feature",
                    "analyServerId"
                ]
            }
    `
   //logger.Debug(url)
   //logger.Debug(JsonDSL)
   buf, err := EsReq("POST", url, []byte(JsonDSL))
   if err != nil {
      return capdbinfo, errors.New("http request dbtablename info is err!")
   }
   // 返回 _source 数组
   sources, err := Sourcelist(buf)
   if err != nil {
      return capdbinfo, err
   }
   // 返回所有查询的数据
   capdbinfos := Parsesources(sources)
   return capdbinfos, nil
}
// 查询底库人员信息
func GetOceanFeatures(serverIp string, serverPort string, queryIndexNum int, queryNums int, indexName string, shards string) ([]*protomsg.Esinfo, error) {
   var dbinfos []*protomsg.Esinfo
   point := strconv.Itoa(queryIndexNum)
   number := strconv.Itoa(queryNums)
   JsonDSL := ""
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
   JsonDSL = `  {
                    "from": ` + point + `,
                    "query": {
                  "bool": {
                     "filter": [
                        {
                           "term": {
                              "targetInfo.targetType.raw": "face"
                        }
                           }
                     ]
                  }
               },
                     "size":` + number + `,
                     "_source": [
                     "id",
                     "targetInfo.feature",
                     "analyServerId"
                       ]
                    }`
   buf, err := EsReq("POST", url, []byte(JsonDSL))
   if err != nil {
      return dbinfos, errors.New("http request dbtablename info is err!")
   }
   // 返回 _source 数组
   sources, err := Sourcelist(buf)
   if err != nil {
      return dbinfos, err
   }
   // 返回所有查询的数据
   dbpersoninfos := Parsesources(sources)
   return dbpersoninfos, nil
}