sunty
2020-01-14 6f023031f3f6f08ac441189be8b16838145edd3e
add tools
2个文件已修改
221 ■■■■■ 已修改文件
EsApi.go 154 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
EsClient.go 67 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
EsApi.go
@@ -661,6 +661,11 @@
}
//添加即将删除信号
func AddDeleteSignal() {
}
/****************************************以下为sdkCompare比对缓存使用方法*********************************************/
//获取查询总数 *缓存*
func GetTotal(serverIp string, serverPort string, indexName string, shards string, targetType string) (total int) {
@@ -799,3 +804,152 @@
    dbpersoninfos := Parsesources(sources)
    return dbpersoninfos, nil
}
//************************CORN TASK*******************************
//查询日期范围内是否还存在数据
func QueryAnalyServerDate(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
    url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
    deleteJson := `{
    "query":{
        "bool":{
            "filter":[{
                "range":{
                    "updateTime":{
                        "gte":"` + startTime + `",
                        "lte":"` + endTime + `"
                    }
                }
            },
            {
                "term":{
                    "analyServerId":"` + analyServerId + `"
                }
            }
            ]
        }
    }
}    `
    buf, err := EsReq("POST", url, []byte(deleteJson))
    if err != nil {
        return false, errors.New("请求失败")
    }
    resTotal, err := SourceTotal(buf)
    if err != nil {
        return false, errors.New("解码失败")
    }
    if resTotal == -1 || resTotal == 0{
        result = false
    } else {
        result = true
    }
    return result, nil
}
//按日期范围,服务器Id删除数据
func DeleteAnalyServerDate(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
    url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_delete_by_query"
    deleteJson := `{
    "query":{
        "bool":{
            "filter":[{
                "range":{
                    "updateTime":{
                        "gte":"` + startTime + `",
                        "lte":"` + endTime + `"
                    }
                }
            },
            {
                "term":{
                    "analyServerId":"` + analyServerId + `"
                }
            }
            ]
        }
    }
}    `
    buf, err := EsReq("POST", url, []byte(deleteJson))
    if err != nil {
        return false, errors.New("请求失败")
    }
    deleteRes, err := SourceDeleted(buf)
    if err != nil {
        return false, errors.New("解码失败")
    }
    if deleteRes == -1 {
        result = false
    } else {
        result = true
    }
    return result, nil
}
//给所有节点追加删除任务信息
func AddDelTask(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
    url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
    addJson := `{
    "script": {
        "lang":"painless",
        "inline": "ctx._source.instantTask.add(params.newtask)",
        "params": {
            "newtask": {
                "instantClearId": "` + analyServerId + `",
                "startTime": "` + startTime + `",
                "endTime": "` + endTime + `"
            }
        }
    },
    "query": {
        "match_all": {}
    }
}`
    buf, err := EsReq("POST", url, []byte(addJson))
    if err != nil {
        return false, errors.New("请求失败")
    }
    updateRes, err := SourceUpdated(buf)
    if err != nil {
        return false, errors.New("解码失败")
    }
    if updateRes == -1 {
        result = false
    } else {
        result = true
    }
    return result, nil
}
//移除已执行完的删除任务
func DeleteDelTask(serverIp string, serverPort string, indexName string, analyServerId string) (result bool, err error) {
    url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
    deleteJson := `{
    "script": {
        "lang":"painless",
        "inline": "ctx._source.instantTask.remove(0)"
    },
    "query": {
        "bool": {
            "filter":[{
                "term":{
                    "id":"` + analyServerId + `"
                }
            }]
        }
    }
}`
    buf, err := EsReq("POST", url, []byte(deleteJson))
    if err != nil {
        return false, errors.New("请求失败")
    }
    updateRes, err := SourceUpdated(buf)
    if err != nil {
        return false, errors.New("解码失败")
    }
    if updateRes == -1 {
        result = false
    } else {
        result = true
    }
    return result, nil
}
EsClient.go
@@ -542,6 +542,73 @@
    return data, nil
}
func SourceCreated(buf []byte) (result bool, err error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return false, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["result"].(string)
    if !ok {
        return false, errors.New("first total change error!")
    }
    if middle == "created" || middle == "updated" {
        result = true
    }
    return result, nil
}
func SourceDeleted(buf []byte) (total int, err error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return -1, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["deleted"].(float64)
    if !ok {
        return -1, errors.New("first total change error!")
    }
    total = int(middle)
    return total, nil
}
func SourceUpdated(buf []byte) (total int, err error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return -1, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["updated"].(float64)
    if !ok {
        return -1, errors.New("first total change error!")
    }
    total = int(middle)
    return total, nil
}
func SourceTotal(buf []byte) (total int, err error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return -1, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["hits"].(map[string]interface{})
    if !ok {
        return -1, errors.New("first total change error!")
    }
    tmp := middle["total"].(float64)
    total = int(tmp)
    return total, nil
}
func EsReq(method string, url string, parama []byte) (buf []byte, err error) {
    //defer elapsed("page")()
    timeout := time.Duration(10 * time.Second)