sunty
2020-01-14 6f023031f3f6f08ac441189be8b16838145edd3e
EsApi.go
@@ -661,6 +661,11 @@
}
//添加即将删除信号
func AddDeleteSignal() {
}
/****************************************以下为sdkCompare比对缓存使用方法*********************************************/
//获取查询总数 *缓存*
func GetTotal(serverIp string, serverPort string, indexName string, shards string, targetType string) (total int) {
@@ -799,3 +804,152 @@
   dbpersoninfos := Parsesources(sources)
   return dbpersoninfos, nil
}
//************************CORN TASK*******************************
//查询日期范围内是否还存在数据
func QueryAnalyServerDate(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
   deleteJson := `{
   "query":{
      "bool":{
         "filter":[{
            "range":{
               "updateTime":{
                  "gte":"` + startTime + `",
                  "lte":"` + endTime + `"
               }
            }
         },
         {
            "term":{
               "analyServerId":"` + analyServerId + `"
            }
         }
         ]
      }
   }
}   `
   buf, err := EsReq("POST", url, []byte(deleteJson))
   if err != nil {
      return false, errors.New("请求失败")
   }
   resTotal, err := SourceTotal(buf)
   if err != nil {
      return false, errors.New("解码失败")
   }
   if resTotal == -1 || resTotal == 0{
      result = false
   } else {
      result = true
   }
   return result, nil
}
//按日期范围,服务器Id删除数据
func DeleteAnalyServerDate(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_delete_by_query"
   deleteJson := `{
   "query":{
      "bool":{
         "filter":[{
            "range":{
               "updateTime":{
                  "gte":"` + startTime + `",
                  "lte":"` + endTime + `"
               }
            }
         },
         {
            "term":{
               "analyServerId":"` + analyServerId + `"
            }
         }
         ]
      }
   }
}   `
   buf, err := EsReq("POST", url, []byte(deleteJson))
   if err != nil {
      return false, errors.New("请求失败")
   }
   deleteRes, err := SourceDeleted(buf)
   if err != nil {
      return false, errors.New("解码失败")
   }
   if deleteRes == -1 {
      result = false
   } else {
      result = true
   }
   return result, nil
}
//给所有节点追加删除任务信息
func AddDelTask(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
   addJson := `{
    "script": {
       "lang":"painless",
        "inline": "ctx._source.instantTask.add(params.newtask)",
        "params": {
            "newtask": {
                "instantClearId": "` + analyServerId + `",
                "startTime": "` + startTime + `",
                "endTime": "` + endTime + `"
            }
        }
    },
    "query": {
        "match_all": {}
    }
}`
   buf, err := EsReq("POST", url, []byte(addJson))
   if err != nil {
      return false, errors.New("请求失败")
   }
   updateRes, err := SourceUpdated(buf)
   if err != nil {
      return false, errors.New("解码失败")
   }
   if updateRes == -1 {
      result = false
   } else {
      result = true
   }
   return result, nil
}
//移除已执行完的删除任务
func DeleteDelTask(serverIp string, serverPort string, indexName string, analyServerId string) (result bool, err error) {
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
   deleteJson := `{
    "script": {
       "lang":"painless",
        "inline": "ctx._source.instantTask.remove(0)"
    },
    "query": {
        "bool": {
           "filter":[{
              "term":{
                 "id":"` + analyServerId + `"
              }
           }]
        }
    }
}`
   buf, err := EsReq("POST", url, []byte(deleteJson))
   if err != nil {
      return false, errors.New("请求失败")
   }
   updateRes, err := SourceUpdated(buf)
   if err != nil {
      return false, errors.New("解码失败")
   }
   if updateRes == -1 {
      result = false
   } else {
      result = true
   }
   return result, nil
}