sunty
2020-01-14 6f023031f3f6f08ac441189be8b16838145edd3e
EsApi.go
@@ -78,7 +78,7 @@
}
//根据目标id查询已追加条数
func GetLinkTagInfoSize(id string, indexName string, serverIp string, serverPort string) (size int,err error){
func GetLinkTagInfoSize(id string, indexName string, serverIp string, serverPort string) (size int, err error) {
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
   queryDSL := `{
           "query": {
@@ -87,20 +87,20 @@
         }
      }
   }`
   buf, err := EsReq("POST",url,[]byte(queryDSL))
   buf, err := EsReq("POST", url, []byte(queryDSL))
   if err != nil {
      return -1,err
      return -1, err
   }
   source, err := Sourcelist(buf)
   if err != nil {
      return -1,err
      return -1, err
   }
   if source[0]["linkTagInfo"] != nil {
      size = len(source[0]["linkTagInfo"].([]interface{}))
   } else {
      return -1,errors.New("该数组不存在")
      return -1, errors.New("该数组不存在")
   }
   return size,nil
   return size, nil
}
//根据目标id追加跟踪信息
@@ -118,7 +118,7 @@
  },
  "script": {
    "lang": "painless",
    "inline": "ctx._source.linkTagInfo.add(params.newparam);ctx._source.updateTime='`+updateTime+`'",
    "inline": "ctx._source.linkTagInfo.add(params.newparam);ctx._source.updateTime='` + updateTime + `'",
    "params": {
      "newparam": ` + targetInfo + `
    }
@@ -151,7 +151,7 @@
}
//根据抓拍人员id更新(videourl)摄像机地址
func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int,err error) {
func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int, err error) {
   var info interface{}
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query?refresh=true"
@@ -176,7 +176,7 @@
   if err != nil {
      fmt.Println("http request videoUrlInfo info is err!")
      statu = 500
      return statu,err
      return statu, err
   }
   json.Unmarshal(buf, &info)
   //fmt.Println(info)
@@ -184,23 +184,23 @@
   if !ok {
      fmt.Println("http response interface can not change map[string]interface{}")
      statu = 500
      return statu,errors.New("http response interface can not change map[string]interface{}")
      return statu, errors.New("http response interface can not change map[string]interface{}")
   }
   middle, ok := out["updated"].(float64)
   if !ok {
      fmt.Println("first updated change error!")
      statu = 500
      return statu,errors.New("first updated change error!")
      return statu, errors.New("first updated change error!")
   }
   if middle == 1 {
      statu = 200
      return statu,nil
      return statu, nil
   }
   if middle == 0 {
      statu = 201
      return statu,errors.New("已经修改")
      return statu, errors.New("已经修改")
   }
   return statu,nil
   return statu, nil
}
//获取当前节点抓拍库所有人员ID*缓存*
@@ -661,6 +661,11 @@
}
//添加即将删除信号
func AddDeleteSignal() {
}
/****************************************以下为sdkCompare比对缓存使用方法*********************************************/
//获取查询总数 *缓存*
func GetTotal(serverIp string, serverPort string, indexName string, shards string, targetType string) (total int) {
@@ -670,7 +675,7 @@
      "bool": {
         "filter": [{
            "term": {
               "targetInfo.targetType.raw": "`+targetType+`"
               "targetInfo.targetType.raw": "` + targetType + `"
            }
         }]
      }
@@ -704,9 +709,9 @@
   var source []string
   switch targetType {
   case "face":
      source = []string{"id","targetInfo.feature","analyServerId","cameraId"}
      source = []string{"id", "targetInfo.feature", "analyServerId", "cameraId"}
   case "track":
      source = []string{"id","targetInfo.feature","analyServerId","cameraId","targetInfo.attachTarget.feature","targetInfo.targetLocation","linkTagInfo.targetInfo.feature","linkTagInfo.targetInfo.attachTarget.feature","linkTagInfo.cameraId","linkTagInfo.targetInfo.targetLocation"}
      source = []string{"id", "targetInfo.feature", "analyServerId", "cameraId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.cameraId", "linkTagInfo.targetInfo.targetLocation"}
   }
   JsonDSL := `
            {
@@ -715,7 +720,7 @@
                        "filter": [
                            {
                                "term": {
                                    "targetInfo.targetType.raw": "`+targetType+`"
                                    "targetInfo.targetType.raw": "` + targetType + `"
                                }
                            },
                            {
@@ -730,7 +735,7 @@
                    }
                },
                "size": 1000000,
                "_source": ["`+strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1)+`"]
                "_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"]
            }
    `
   //logger.Debug(url)
@@ -761,9 +766,9 @@
   var source []string
   switch targetType {
   case "face":
      source = []string{"id","targetInfo.feature","analyServerId"}
      source = []string{"id", "targetInfo.feature", "analyServerId"}
   case "track":
      source = []string{"id","targetInfo.feature","analyServerId","targetInfo.attachTarget.feature","targetInfo.targetLocation","linkTagInfo.targetInfo.feature","linkTagInfo.targetInfo.attachTarget.feature","linkTagInfo.targetInfo.targetLocation"}
      source = []string{"id", "targetInfo.feature", "analyServerId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.targetInfo.targetLocation"}
   }
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
@@ -774,14 +779,14 @@
                     "filter": [
                        {
                           "term": {
                              "targetInfo.targetType.raw": "`+targetType+`"
                              "targetInfo.targetType.raw": "` + targetType + `"
                        }
                           }
                     ]
                  }   
               },
                     "size":` + number + `,
                     "_source": ["`+strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1)+`"]
                     "_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"]
                    }`
   buf, err := EsReq("POST", url, []byte(JsonDSL))
@@ -799,3 +804,152 @@
   dbpersoninfos := Parsesources(sources)
   return dbpersoninfos, nil
}
//************************CORN TASK*******************************
//查询日期范围内是否还存在数据
func QueryAnalyServerDate(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
   deleteJson := `{
   "query":{
      "bool":{
         "filter":[{
            "range":{
               "updateTime":{
                  "gte":"` + startTime + `",
                  "lte":"` + endTime + `"
               }
            }
         },
         {
            "term":{
               "analyServerId":"` + analyServerId + `"
            }
         }
         ]
      }
   }
}   `
   buf, err := EsReq("POST", url, []byte(deleteJson))
   if err != nil {
      return false, errors.New("请求失败")
   }
   resTotal, err := SourceTotal(buf)
   if err != nil {
      return false, errors.New("解码失败")
   }
   if resTotal == -1 || resTotal == 0{
      result = false
   } else {
      result = true
   }
   return result, nil
}
//按日期范围,服务器Id删除数据
func DeleteAnalyServerDate(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_delete_by_query"
   deleteJson := `{
   "query":{
      "bool":{
         "filter":[{
            "range":{
               "updateTime":{
                  "gte":"` + startTime + `",
                  "lte":"` + endTime + `"
               }
            }
         },
         {
            "term":{
               "analyServerId":"` + analyServerId + `"
            }
         }
         ]
      }
   }
}   `
   buf, err := EsReq("POST", url, []byte(deleteJson))
   if err != nil {
      return false, errors.New("请求失败")
   }
   deleteRes, err := SourceDeleted(buf)
   if err != nil {
      return false, errors.New("解码失败")
   }
   if deleteRes == -1 {
      result = false
   } else {
      result = true
   }
   return result, nil
}
//给所有节点追加删除任务信息
func AddDelTask(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
   addJson := `{
    "script": {
       "lang":"painless",
        "inline": "ctx._source.instantTask.add(params.newtask)",
        "params": {
            "newtask": {
                "instantClearId": "` + analyServerId + `",
                "startTime": "` + startTime + `",
                "endTime": "` + endTime + `"
            }
        }
    },
    "query": {
        "match_all": {}
    }
}`
   buf, err := EsReq("POST", url, []byte(addJson))
   if err != nil {
      return false, errors.New("请求失败")
   }
   updateRes, err := SourceUpdated(buf)
   if err != nil {
      return false, errors.New("解码失败")
   }
   if updateRes == -1 {
      result = false
   } else {
      result = true
   }
   return result, nil
}
//移除已执行完的删除任务
func DeleteDelTask(serverIp string, serverPort string, indexName string, analyServerId string) (result bool, err error) {
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
   deleteJson := `{
    "script": {
       "lang":"painless",
        "inline": "ctx._source.instantTask.remove(0)"
    },
    "query": {
        "bool": {
           "filter":[{
              "term":{
                 "id":"` + analyServerId + `"
              }
           }]
        }
    }
}`
   buf, err := EsReq("POST", url, []byte(deleteJson))
   if err != nil {
      return false, errors.New("请求失败")
   }
   updateRes, err := SourceUpdated(buf)
   if err != nil {
      return false, errors.New("解码失败")
   }
   if updateRes == -1 {
      result = false
   } else {
      result = true
   }
   return result, nil
}