| | |
| | | |
| | | } |
| | | |
| | | //添加即将删除信号 |
| | | func AddDeleteSignal() { |
| | | |
| | | } |
| | | |
| | | /****************************************以下为sdkCompare比对缓存使用方法*********************************************/ |
| | | //获取查询总数 *缓存* |
| | | func GetTotal(serverIp string, serverPort string, indexName string, shards string, targetType string) (total int) { |
| | |
| | | dbpersoninfos := Parsesources(sources) |
| | | return dbpersoninfos, nil |
| | | } |
| | | |
| | | //************************CORN TASK******************************* |
| | | //查询日期范围内是否还存在数据 |
| | | func QueryAnalyServerDate(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | deleteJson := `{ |
| | | "query":{ |
| | | "bool":{ |
| | | "filter":[{ |
| | | "range":{ |
| | | "updateTime":{ |
| | | "gte":"` + startTime + `", |
| | | "lte":"` + endTime + `" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "term":{ |
| | | "analyServerId":"` + analyServerId + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | } |
| | | } ` |
| | | buf, err := EsReq("POST", url, []byte(deleteJson)) |
| | | if err != nil { |
| | | return false, errors.New("请求失败") |
| | | } |
| | | resTotal, err := SourceTotal(buf) |
| | | if err != nil { |
| | | return false, errors.New("解码失败") |
| | | } |
| | | if resTotal == -1 || resTotal == 0{ |
| | | result = false |
| | | } else { |
| | | result = true |
| | | } |
| | | return result, nil |
| | | } |
| | | |
| | | |
| | | //按日期范围,服务器Id删除数据 |
| | | func DeleteAnalyServerDate(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_delete_by_query" |
| | | deleteJson := `{ |
| | | "query":{ |
| | | "bool":{ |
| | | "filter":[{ |
| | | "range":{ |
| | | "updateTime":{ |
| | | "gte":"` + startTime + `", |
| | | "lte":"` + endTime + `" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "term":{ |
| | | "analyServerId":"` + analyServerId + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | } |
| | | } ` |
| | | buf, err := EsReq("POST", url, []byte(deleteJson)) |
| | | if err != nil { |
| | | return false, errors.New("请求失败") |
| | | } |
| | | deleteRes, err := SourceDeleted(buf) |
| | | if err != nil { |
| | | return false, errors.New("解码失败") |
| | | } |
| | | if deleteRes == -1 { |
| | | result = false |
| | | } else { |
| | | result = true |
| | | } |
| | | return result, nil |
| | | } |
| | | |
| | | //给所有节点追加删除任务信息 |
| | | func AddDelTask(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query" |
| | | addJson := `{ |
| | | "script": { |
| | | "lang":"painless", |
| | | "inline": "ctx._source.instantTask.add(params.newtask)", |
| | | "params": { |
| | | "newtask": { |
| | | "instantClearId": "` + analyServerId + `", |
| | | "startTime": "` + startTime + `", |
| | | "endTime": "` + endTime + `" |
| | | } |
| | | } |
| | | }, |
| | | "query": { |
| | | "match_all": {} |
| | | } |
| | | }` |
| | | buf, err := EsReq("POST", url, []byte(addJson)) |
| | | if err != nil { |
| | | return false, errors.New("请求失败") |
| | | } |
| | | updateRes, err := SourceUpdated(buf) |
| | | if err != nil { |
| | | return false, errors.New("解码失败") |
| | | } |
| | | if updateRes == -1 { |
| | | result = false |
| | | } else { |
| | | result = true |
| | | } |
| | | return result, nil |
| | | } |
| | | |
| | | //移除已执行完的删除任务 |
| | | func DeleteDelTask(serverIp string, serverPort string, indexName string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query" |
| | | deleteJson := `{ |
| | | "script": { |
| | | "lang":"painless", |
| | | "inline": "ctx._source.instantTask.remove(0)" |
| | | }, |
| | | "query": { |
| | | "bool": { |
| | | "filter":[{ |
| | | "term":{ |
| | | "id":"` + analyServerId + `" |
| | | } |
| | | }] |
| | | } |
| | | } |
| | | }` |
| | | buf, err := EsReq("POST", url, []byte(deleteJson)) |
| | | if err != nil { |
| | | return false, errors.New("请求失败") |
| | | } |
| | | updateRes, err := SourceUpdated(buf) |
| | | if err != nil { |
| | | return false, errors.New("解码失败") |
| | | } |
| | | if updateRes == -1 { |
| | | result = false |
| | | } else { |
| | | result = true |
| | | } |
| | | return result, nil |
| | | } |
| | |
| | | return data, nil |
| | | } |
| | | |
| | | func SourceCreated(buf []byte) (result bool, err error) { |
| | | var info interface{} |
| | | json.Unmarshal(buf, &info) |
| | | out, ok := info.(map[string]interface{}) |
| | | if !ok { |
| | | return false, errors.New("http response interface can not change map[string]interface{}") |
| | | } |
| | | |
| | | middle, ok := out["result"].(string) |
| | | if !ok { |
| | | return false, errors.New("first total change error!") |
| | | } |
| | | if middle == "created" || middle == "updated" { |
| | | result = true |
| | | } |
| | | return result, nil |
| | | } |
| | | |
| | | func SourceDeleted(buf []byte) (total int, err error) { |
| | | var info interface{} |
| | | json.Unmarshal(buf, &info) |
| | | out, ok := info.(map[string]interface{}) |
| | | if !ok { |
| | | return -1, errors.New("http response interface can not change map[string]interface{}") |
| | | } |
| | | |
| | | middle, ok := out["deleted"].(float64) |
| | | if !ok { |
| | | return -1, errors.New("first total change error!") |
| | | } |
| | | total = int(middle) |
| | | return total, nil |
| | | } |
| | | |
| | | func SourceUpdated(buf []byte) (total int, err error) { |
| | | var info interface{} |
| | | json.Unmarshal(buf, &info) |
| | | out, ok := info.(map[string]interface{}) |
| | | if !ok { |
| | | return -1, errors.New("http response interface can not change map[string]interface{}") |
| | | } |
| | | |
| | | middle, ok := out["updated"].(float64) |
| | | if !ok { |
| | | return -1, errors.New("first total change error!") |
| | | } |
| | | total = int(middle) |
| | | return total, nil |
| | | } |
| | | |
| | | func SourceTotal(buf []byte) (total int, err error) { |
| | | var info interface{} |
| | | json.Unmarshal(buf, &info) |
| | | out, ok := info.(map[string]interface{}) |
| | | if !ok { |
| | | return -1, errors.New("http response interface can not change map[string]interface{}") |
| | | } |
| | | |
| | | middle, ok := out["hits"].(map[string]interface{}) |
| | | if !ok { |
| | | return -1, errors.New("first total change error!") |
| | | } |
| | | tmp := middle["total"].(float64) |
| | | total = int(tmp) |
| | | return total, nil |
| | | } |
| | | |
| | | func EsReq(method string, url string, parama []byte) (buf []byte, err error) { |
| | | //defer elapsed("page")() |
| | | timeout := time.Duration(10 * time.Second) |