sunty
2020-01-14 ad9de83627775d4275c96a1ceb8518bccd36695c
EsApi.go
@@ -72,12 +72,86 @@
   if err != nil {
      return "", err
   }
   feature := sources[0]["targetInfo"].([]interface{})[0].(map[string]interface{})["feature"].(string)
   return feature, nil
}
//根据目标id查询已追加条数
func GetLinkTagInfoSize(id string, indexName string, serverIp string, serverPort string) (size int, err error) {
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
   queryDSL := `{
           "query": {
             "term":{
               "id":"` + id + `"
         }
      }
   }`
   buf, err := EsReq("POST", url, []byte(queryDSL))
   if err != nil {
      return -1, err
   }
   source, err := Sourcelist(buf)
   if err != nil {
      return -1, err
   }
   if source[0]["linkTagInfo"] != nil {
      size = len(source[0]["linkTagInfo"].([]interface{}))
   } else {
      return -1, errors.New("该数组不存在")
   }
   return size, nil
}
//根据目标id追加跟踪信息
func AppendTargetInfo(id string, targetInfo string, indexName string, serverIp string, serverPort string, updateTime string) (string, error) {
   if targetInfo == "" {
      return "", errors.New("append data is nil")
   }
   var info interface{}
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query?refresh=true"
   jsonDSL := `{
  "query": {
    "term":{
      "id":"` + id + `"
    }
  },
  "script": {
    "lang": "painless",
    "inline": "ctx._source.linkTagInfo.add(params.newparam);ctx._source.updateTime='` + updateTime + `'",
    "params": {
      "newparam": ` + targetInfo + `
    }
  }
}`
   fmt.Println(jsonDSL)
   buf, err := EsReq("POST", url, []byte(jsonDSL))
   if err != nil {
      return "", err
   }
   json.Unmarshal(buf, &info)
   out, ok := info.(map[string]interface{})
   fmt.Println(out)
   if !ok {
      return "", errors.New("http response interface can not change map[string]interface{}")
   }
   middle, ok := out["updated"].(float64)
   if !ok {
      return "", errors.New("first updated change error!")
   }
   mes := ""
   if middle == 1 {
      mes = "追加成功"
   }
   if middle == 0 {
      mes = "已经追加"
   }
   return mes, nil
}
//根据抓拍人员id更新(videourl)摄像机地址
func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int) {
func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int, err error) {
   var info interface{}
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query?refresh=true"
@@ -102,7 +176,7 @@
   if err != nil {
      fmt.Println("http request videoUrlInfo info is err!")
      statu = 500
      return
      return statu, err
   }
   json.Unmarshal(buf, &info)
   //fmt.Println(info)
@@ -110,26 +184,26 @@
   if !ok {
      fmt.Println("http response interface can not change map[string]interface{}")
      statu = 500
      return
      return statu, errors.New("http response interface can not change map[string]interface{}")
   }
   middle, ok := out["updated"].(float64)
   if !ok {
      fmt.Println("first updated change error!")
      statu = 500
      return
      return statu, errors.New("first updated change error!")
   }
   if middle == 1 {
      statu = 200
      return
      return statu, nil
   }
   if middle == 0 {
      statu = 201
      return
      return statu, errors.New("已经修改")
   }
   return statu
   return statu, nil
}
//获取当前节点抓拍库所有人员ID
//获取当前节点抓拍库所有人员ID*缓存*
func GetAllLocalVideopersonsId(compareArgs protomsg.CompareArgs, indexName string, serverIp string, serverPort string, alarmLevelTypes string) (capturetable []string) {
   queryStr := ""
   queryBody := compareArgs.InputValue
@@ -294,23 +368,32 @@
}
//初始化实时抓拍
func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool, category string, quantity int) ([]protomsg.AIOcean, error) {
func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm string, category string, quantity int) ([]protomsg.AIOcean, error) {
   var aIOceanInfo []protomsg.AIOcean
   url := "http://" + serverIp + ":" + serverPort +
      "/" + indexName + "/_search"
      queryStr := `"query":{
      "bool":{
         "filter":[
            {
   var filterArr []string
   if isAlarm != "all" {
      filterArr = append(filterArr, `            {
               "term":{
                  "isAlarm":`+strconv.FormatBool(isAlarm)+`
                  "isAlarm":"`+isAlarm+`"
               }
            },
            {
            }`)
   }
   if category != "all" {
      filterArr = append(filterArr, `            {
               "term":{
                  "targetInfo.targetType":"`+category+`"
               }
            }
            }`)
   }
   queryStr := `"query":{
      "bool":{
         "filter":[
            ` + strings.Join(filterArr, ",") + `
         ]
      }
   },`
@@ -357,7 +440,7 @@
            },
            {
               "term":{
                  "isAlarm":`+strconv.FormatBool(isAlarm)+`
                  "isAlarm":` + strconv.FormatBool(isAlarm) + `
               }
            }
         ]
@@ -382,23 +465,29 @@
}
//综合统计
func StatisticsComprehensive(serverIp string, serverPort string, indexName string, isAlarm bool) (total int, err error) {
func StatisticsComprehensive(serverIp string, serverPort string, indexName string, isAlarm string) (total int, err error) {
   url := "http://" + serverIp + ":" + serverPort +
      "/" + indexName + "/_search"
   isAlarmStr := ""
   if isAlarm != "all" {
      isAlarmStr = `            {
               "term":{
                  "isAlarm":"` + isAlarm + `"
               }
            },`
   }
   DSLJson := `{
   "size":0,
   "query":{
      "bool":{
         "filter":[{
         "filter":[
            ` + isAlarmStr + `
            {
            "range":{
               "picDate":{
                  "gte":"now+8h/d"
                  }
               }
            },
            {
               "term":{
                  "isAlarm":`+strconv.FormatBool(isAlarm)+`
               }
            }
            ]
@@ -443,9 +532,9 @@
      }
   },
   "aggs":{
      "sdkId_status":{
      "sdkName_status":{
         "terms":{
            "field":"taskId"
            "field":"sdkName.raw"
         }
      }
   }
@@ -464,7 +553,7 @@
   if !ok {
      return nil, errors.New("first hits change error!")
   }
   sdkName_status, ok := middle["sdkId_status"].(map[string]interface{})
   sdkName_status, ok := middle["sdkName_status"].(map[string]interface{})
   if !ok {
      return nil, errors.New("first hits change error!")
   }
@@ -476,9 +565,9 @@
         fmt.Println("change to source error!")
         continue
      }
      sdkId := tmpbuf["key"].(string)
      sdkName := tmpbuf["key"].(string)
      count := int(tmpbuf["doc_count"].(float64))
      source["id"] = sdkId
      source["name"] = sdkName
      source["value"] = count
      sources = append(sources, source)
   }
@@ -572,23 +661,28 @@
}
//添加即将删除信号
func AddDeleteSignal() {
}
/****************************************以下为sdkCompare比对缓存使用方法*********************************************/
//获取查询总数
func GetTotal(serverIp string, serverPort string, indexName string, shards string) (total int) {
//获取查询总数 *缓存*
func GetTotal(serverIp string, serverPort string, indexName string, shards string, targetType string) (total int) {
   JsonDSL := `{
   "size": 0,
   "query": {
      "bool": {
         "filter": [{
            "term": {
               "targetInfo.targetType.raw": "face"
               "targetInfo.targetType.raw": "` + targetType + `"
            }
         }]
      }
   }
}`
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
   buf, err := EsReq("POST", url, []byte(JsonDSL))
   if err != nil {
      return
@@ -608,11 +702,17 @@
}
//查询时间段数据
func GetPeriodInfos(serverIp string, serverPort string, startTime string, endTime string, indexName string, shards string) ([]*protomsg.Esinfo, error) {
   var capdbinfo []*protomsg.Esinfo
//查询时间段数据 *缓存*
func GetPeriodInfos(serverIp string, serverPort string, startTime string, endTime string, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) {
   var capdbinfo []*protomsg.MultiFeaCache
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
   var source []string
   switch targetType {
   case "face":
      source = []string{"id", "targetInfo.feature", "analyServerId", "cameraId"}
   case "track":
      source = []string{"id", "targetInfo.feature", "analyServerId", "cameraId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.cameraId", "linkTagInfo.targetInfo.targetLocation"}
   }
   JsonDSL := `
            {
                "query": {
@@ -620,7 +720,7 @@
                        "filter": [
                            {
                                "term": {
                                    "targetInfo.targetType.raw": "face"
                                    "targetInfo.targetType.raw": "` + targetType + `"
                                }
                            },
                            {
@@ -635,15 +735,12 @@
                    }
                },
                "size": 1000000,
                "_source": [
                    "id",
                    "targetInfo.feature",
                    "analyServerId"
                ]
                "_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"]
            }
    `
   //logger.Debug(url)
   //logger.Debug(JsonDSL)
   //fmt.Println(JsonDSL)
   buf, err := EsReq("POST", url, []byte(JsonDSL))
   if err != nil {
      return capdbinfo, errors.New("http request dbtablename info is err!")
@@ -654,19 +751,26 @@
   if err != nil {
      return capdbinfo, err
   }
   //fmt.Println(sources)
   // 返回所有查询的数据
   capdbinfos := Parsesources(sources)
   return capdbinfos, nil
}
// 查询底库人员信息
func GetOceanFeatures(serverIp string, serverPort string, queryIndexNum int, queryNums int, indexName string, shards string) ([]*protomsg.Esinfo, error) {
   var dbinfos []*protomsg.Esinfo
// 查询底库人员信息*缓存*
func GetOceanFeatures(serverIp string, serverPort string, queryIndexNum int, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) {
   var dbinfos []*protomsg.MultiFeaCache
   point := strconv.Itoa(queryIndexNum)
   number := strconv.Itoa(queryNums)
   JsonDSL := ""
   var source []string
   switch targetType {
   case "face":
      source = []string{"id", "targetInfo.feature", "analyServerId"}
   case "track":
      source = []string{"id", "targetInfo.feature", "analyServerId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.targetInfo.targetLocation"}
   }
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
   JsonDSL = `  {
                    "from": ` + point + `,        
@@ -675,18 +779,14 @@
                     "filter": [
                        {
                           "term": {
                              "targetInfo.targetType.raw": "face"
                              "targetInfo.targetType.raw": "` + targetType + `"
                        }
                           }
                     ]
                  }   
               },
                     "size":` + number + `,
                     "_source": [
                     "id",
                     "targetInfo.feature",
                     "analyServerId"
                       ]
                     "_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"]
                    }`
   buf, err := EsReq("POST", url, []byte(JsonDSL))
@@ -704,3 +804,152 @@
   dbpersoninfos := Parsesources(sources)
   return dbpersoninfos, nil
}
//************************CORN TASK*******************************
//查询日期范围内是否还存在数据
func QueryAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
   deleteJson := `{
   "query":{
      "bool":{
         "filter":[{
            "range":{
               "updateTime":{
                  "gte":"` + startTime + `",
                  "lte":"` + endTime + `"
               }
            }
         },
         {
            "term":{
               "analyServerId":"` + analyServerId + `"
            }
         }
         ]
      }
   }
}   `
   buf, err := EsReq("POST", url, []byte(deleteJson))
   if err != nil {
      return false, errors.New("请求失败")
   }
   resTotal, err := SourceTotal(buf)
   if err != nil {
      return false, errors.New("解码失败")
   }
   if resTotal == -1 || resTotal == 0{
      result = false
   } else {
      result = true
   }
   return result, nil
}
//按日期范围,服务器Id删除数据
func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_delete_by_query"
   deleteJson := `{
   "query":{
      "bool":{
         "filter":[{
            "range":{
               "updateTime":{
                  "gte":"` + startTime + `",
                  "lte":"` + endTime + `"
               }
            }
         },
         {
            "term":{
               "analyServerId":"` + analyServerId + `"
            }
         }
         ]
      }
   }
}   `
   buf, err := EsReq("POST", url, []byte(deleteJson))
   if err != nil {
      return false, errors.New("请求失败")
   }
   deleteRes, err := SourceDeleted(buf)
   if err != nil {
      return false, errors.New("解码失败")
   }
   if deleteRes == -1 {
      result = false
   } else {
      result = true
   }
   return result, nil
}
//给所有节点追加删除任务信息
func AddDelTask(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
   addJson := `{
    "script": {
       "lang":"painless",
        "inline": "ctx._source.instantTask.add(params.newtask)",
        "params": {
            "newtask": {
                "instantClearId": "` + analyServerId + `",
                "startTime": "` + startTime + `",
                "endTime": "` + endTime + `"
            }
        }
    },
    "query": {
        "match_all": {}
    }
}`
   buf, err := EsReq("POST", url, []byte(addJson))
   if err != nil {
      return false, errors.New("请求失败")
   }
   updateRes, err := SourceUpdated(buf)
   if err != nil {
      return false, errors.New("解码失败")
   }
   if updateRes == -1 {
      result = false
   } else {
      result = true
   }
   return result, nil
}
//移除已执行完的删除任务
func DeleteDelTask(serverIp string, serverPort string, indexName string, analyServerId string) (result bool, err error) {
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
   deleteJson := `{
    "script": {
       "lang":"painless",
        "inline": "ctx._source.instantTask.remove(0)"
    },
    "query": {
        "bool": {
           "filter":[{
              "term":{
                 "id":"` + analyServerId + `"
              }
           }]
        }
    }
}`
   buf, err := EsReq("POST", url, []byte(deleteJson))
   if err != nil {
      return false, errors.New("请求失败")
   }
   updateRes, err := SourceUpdated(buf)
   if err != nil {
      return false, errors.New("解码失败")
   }
   if updateRes == -1 {
      result = false
   } else {
      result = true
   }
   return result, nil
}