liuxiaolong
2019-10-26 86655db5ef2cca9d7cf4eedae879c3a7c4464b9f
EsApi.go
@@ -7,7 +7,6 @@
    "strconv"
    "strings"
   "sync"
   "time"
   "basic.com/pubsub/protomsg.git"
        )
@@ -288,7 +287,6 @@
//获取当前节点抓拍库所有人员ID
func GetAllLocalVideopersonsId(compareArgs protomsg.CompareArgs, indexName string, serverIp string, serverPort string) (capturetable []string) {
   ts := time.Now()
   queryStr := ""
   queryBody := compareArgs.InputValue
   //检索框
@@ -329,12 +327,12 @@
   //使用es底层机制处理分页
   analyServerFilterStr := ""
   analyServerId := compareArgs.AnalyServerId
   if analyServerId == "" {
      fmt.Println("no analyServerId")
      return
   if analyServerId != "" {
      analyServerFilterStr = "{\"term\":{\"analyServerId\":\"" + analyServerId + "\"}},"
   }
   analyServerFilterStr := "{\"term\":{\"analyServerId\":\"" + analyServerId + "\"}},"
   //首次请求头
   url := "http://" + serverIp + ":" + serverPort +
@@ -491,7 +489,7 @@
}
//初始化实时抓拍
func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool   ) ([]protomsg.Videopersons, error){
func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool   ,quantity int) ([]protomsg.Videopersons, error){
   var videopersonsInfo []protomsg.Videopersons
   url := "http://" + serverIp + ":" + serverPort +
      "/" + indexName + "/_search"
@@ -514,12 +512,11 @@
   },`
   }
   DSLJson := `{
   "size":20,
   "size":`+strconv.Itoa(quantity)+`,
   `+queryStr+`
   "sort":[{"picDate":{"order":"desc"}}],
   "_source": ["baseInfo", "alarmRules", "sex", "analyServerName", "sdkName", "ageDescription", "content", "id", "cameraAddr", "picMaxUrl", "picDate", "race", "videoUrl", "picSmUrl", "taskName", "personIsHub", "isAlarm", "analyServerIp", "cameraId"]
   }`
   buf, err := EsReq("POST", url, []byte(DSLJson))
   if err != nil {
      return videopersonsInfo, err
@@ -588,7 +585,11 @@
}
//综合统计
func StatisticsComprehensive(serverIp string, serverPort string, indexName string) (total int, err error){
func StatisticsComprehensive(serverIp string, serverPort string, indexName string, isAlarm bool) (total int, err error){
   isAlarmStr := ""
   if isAlarm == true {
      isAlarmStr = `,{"term":{"isAlarm":1}}`
   }
   url := "http://" + serverIp + ":" + serverPort +
      "/" + indexName + "/_search"
   DSLJson := `{
@@ -598,13 +599,16 @@
         "filter":[{
            "range":{
               "picDate":{
                  "gte":"now+8H/d"
                  "gte":"now+8h/d"
                  }
               }
            }]
            }
            `+isAlarmStr+`
            ]
         }
      }
   }`
   //fmt.Println(DSLJson)
   buf, err := EsReq("POST",url,[]byte(DSLJson))
   if err != nil {
      return total, err
@@ -623,8 +627,9 @@
   //fmt.Println(total)
   return total,nil
}
//实时报警任务比率
func RealTimeAlarmTaskRate(serverIp string, serverPort string,indexName string) (sources map[string]int,err error){
func RealTimeAlarmTaskRate(serverIp string, serverPort string, indexName string) (sources []map[string]interface{},err error){
   url := "http://" + serverIp + ":" + serverPort +
      "/"+indexName+"/_search"
   DSLJson := `{
@@ -649,7 +654,7 @@
   }
}`
   buf, err := EsReq("POST",url,[]byte(DSLJson))
   if err != nil {
   if err != nil {
      return nil, err
   }
   var info interface{}
@@ -666,8 +671,9 @@
   if !ok {
      return nil, errors.New("first hits change error!")
   }
   var source = make(map[string]int,0)
   for _, in := range sdkName_status["buckets"].([]interface{}){
      var source = make(map[string]interface{},0)
      tmpbuf, ok := in.(map[string]interface{})
      if !ok {
         fmt.Println("change to source error!")
@@ -675,10 +681,80 @@
      }
      sdkName := tmpbuf["key"].(string)
      count := int(tmpbuf["doc_count"].(float64))
      source[sdkName] = count
      //fmt.Println("in",in)
      //sources[in["key"].(string)] = int(in["doc_count"].(float64))
      source["name"] = sdkName
      source["value"] = count
      sources = append(sources, source)
   }
   //fmt.Println("sources",source)
   return source,nil
   //fmt.Println("tmpSource",sources)
   return sources,nil
}
//聚合任务列表,taskId+taskName
func AggregateTaskList(serverIp string, serverPort string, indexName string) (sources []map[string]interface{},err error){
   url := "http://" + serverIp + ":" + serverPort +
      "/"+indexName+"/_search"
   DSLJson := `{
    "size": 0,
    "aggs": {
        "task_status": {
            "composite": {
                "sources": [
                    {
                        "taskId": {
                            "terms": {
                                "field": "taskId"
                            }
                        }
                    },
                    {
                        "taskName": {
                            "terms": {
                                "field": "taskName.raw"
                            }
                        }
                    }
                ],
                "size":"1000"
            }
        }
    }
}`
   buf, err := EsReq("POST",url,[]byte(DSLJson))
   if err != nil {
      return nil, err
   }
   var info interface{}
   json.Unmarshal(buf, &info)
   out, ok := info.(map[string]interface{})
   if !ok {
      return nil, errors.New("http response interface can not change map[string]interface{}")
   }
   middle, ok := out["aggregations"].(map[string]interface{})
   if !ok {
      return nil, errors.New("first hits change error!")
   }
   task_status, ok := middle["task_status"].(map[string]interface{})
   if !ok {
      return nil, errors.New("first hits change error!")
   }
   for _, in := range task_status["buckets"].([]interface{}){
      var source = make(map[string]interface{},0)
      tmpbuf, ok := in.(map[string]interface{})
      if !ok {
         fmt.Println("change to source error!")
         continue
      }
      task := tmpbuf["key"].(map[string]interface{})
      count := int(tmpbuf["doc_count"].(float64))
      taskName := task["taskName"].(string)
      taskId := task["taskId"].(string)
      source["taskName"] = taskName
      source["taskId"] = taskId
      source["count"] = count
      sources = append(sources, source)
   }
   //fmt.Println("tmpSource",sources)
   return sources,nil
}