sunty
2020-01-14 6f023031f3f6f08ac441189be8b16838145edd3e
add tools
2个文件已修改
267 ■■■■■ 已修改文件
EsApi.go 200 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
EsClient.go 67 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
EsApi.go
@@ -78,7 +78,7 @@
}
//根据目标id查询已追加条数
func GetLinkTagInfoSize(id string, indexName string, serverIp string, serverPort string) (size int,err error){
func GetLinkTagInfoSize(id string, indexName string, serverIp string, serverPort string) (size int, err error) {
    url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
    queryDSL := `{
              "query": {
@@ -87,20 +87,20 @@
            }
        }
    }`
    buf, err := EsReq("POST",url,[]byte(queryDSL))
    buf, err := EsReq("POST", url, []byte(queryDSL))
    if err != nil {
        return -1,err
        return -1, err
    }
    source, err := Sourcelist(buf)
    if err != nil {
        return -1,err
        return -1, err
    }
    if source[0]["linkTagInfo"] != nil {
        size = len(source[0]["linkTagInfo"].([]interface{}))
    } else {
        return -1,errors.New("该数组不存在")
        return -1, errors.New("该数组不存在")
    }
    return size,nil
    return size, nil
}
//根据目标id追加跟踪信息
@@ -118,7 +118,7 @@
  },
  "script": {
    "lang": "painless",
    "inline": "ctx._source.linkTagInfo.add(params.newparam);ctx._source.updateTime='`+updateTime+`'",
    "inline": "ctx._source.linkTagInfo.add(params.newparam);ctx._source.updateTime='` + updateTime + `'",
    "params": {
      "newparam": ` + targetInfo + `
    }
@@ -151,7 +151,7 @@
}
//根据抓拍人员id更新(videourl)摄像机地址
func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int,err error) {
func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int, err error) {
    var info interface{}
    url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query?refresh=true"
@@ -176,7 +176,7 @@
    if err != nil {
        fmt.Println("http request videoUrlInfo info is err!")
        statu = 500
        return statu,err
        return statu, err
    }
    json.Unmarshal(buf, &info)
    //fmt.Println(info)
@@ -184,23 +184,23 @@
    if !ok {
        fmt.Println("http response interface can not change map[string]interface{}")
        statu = 500
        return statu,errors.New("http response interface can not change map[string]interface{}")
        return statu, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["updated"].(float64)
    if !ok {
        fmt.Println("first updated change error!")
        statu = 500
        return statu,errors.New("first updated change error!")
        return statu, errors.New("first updated change error!")
    }
    if middle == 1 {
        statu = 200
        return statu,nil
        return statu, nil
    }
    if middle == 0 {
        statu = 201
        return statu,errors.New("已经修改")
        return statu, errors.New("已经修改")
    }
    return statu,nil
    return statu, nil
}
//获取当前节点抓拍库所有人员ID*缓存*
@@ -661,6 +661,11 @@
}
//添加即将删除信号
func AddDeleteSignal() {
}
/****************************************以下为sdkCompare比对缓存使用方法*********************************************/
//获取查询总数 *缓存*
func GetTotal(serverIp string, serverPort string, indexName string, shards string, targetType string) (total int) {
@@ -670,7 +675,7 @@
        "bool": {
            "filter": [{
                "term": {
                    "targetInfo.targetType.raw": "`+targetType+`"
                    "targetInfo.targetType.raw": "` + targetType + `"
                }
            }]
        }
@@ -704,9 +709,9 @@
    var source []string
    switch targetType {
    case "face":
        source = []string{"id","targetInfo.feature","analyServerId","cameraId"}
        source = []string{"id", "targetInfo.feature", "analyServerId", "cameraId"}
    case "track":
        source = []string{"id","targetInfo.feature","analyServerId","cameraId","targetInfo.attachTarget.feature","targetInfo.targetLocation","linkTagInfo.targetInfo.feature","linkTagInfo.targetInfo.attachTarget.feature","linkTagInfo.cameraId","linkTagInfo.targetInfo.targetLocation"}
        source = []string{"id", "targetInfo.feature", "analyServerId", "cameraId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.cameraId", "linkTagInfo.targetInfo.targetLocation"}
    }
    JsonDSL := `
                {
@@ -715,7 +720,7 @@
                            "filter": [
                                {
                                    "term": {
                                        "targetInfo.targetType.raw": "`+targetType+`"
                                        "targetInfo.targetType.raw": "` + targetType + `"
                                    }
                                },
                                {
@@ -730,7 +735,7 @@
                        }
                    },
                    "size": 1000000,
                    "_source": ["`+strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1)+`"]
                    "_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"]
                }
    `
    //logger.Debug(url)
@@ -761,9 +766,9 @@
    var source []string
    switch targetType {
    case "face":
        source = []string{"id","targetInfo.feature","analyServerId"}
        source = []string{"id", "targetInfo.feature", "analyServerId"}
    case "track":
        source = []string{"id","targetInfo.feature","analyServerId","targetInfo.attachTarget.feature","targetInfo.targetLocation","linkTagInfo.targetInfo.feature","linkTagInfo.targetInfo.attachTarget.feature","linkTagInfo.targetInfo.targetLocation"}
        source = []string{"id", "targetInfo.feature", "analyServerId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.targetInfo.targetLocation"}
    }
    url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
@@ -774,14 +779,14 @@
                            "filter": [
                                {
                                    "term": {
                                        "targetInfo.targetType.raw": "`+targetType+`"
                                        "targetInfo.targetType.raw": "` + targetType + `"
                                }
                                    }
                            ]
                        }    
                    },
                     "size":` + number + `,
                     "_source": ["`+strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1)+`"]
                     "_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"]
                    }`
    buf, err := EsReq("POST", url, []byte(JsonDSL))
@@ -799,3 +804,152 @@
    dbpersoninfos := Parsesources(sources)
    return dbpersoninfos, nil
}
//************************CORN TASK*******************************
//查询日期范围内是否还存在数据
func QueryAnalyServerDate(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
    url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
    deleteJson := `{
    "query":{
        "bool":{
            "filter":[{
                "range":{
                    "updateTime":{
                        "gte":"` + startTime + `",
                        "lte":"` + endTime + `"
                    }
                }
            },
            {
                "term":{
                    "analyServerId":"` + analyServerId + `"
                }
            }
            ]
        }
    }
}    `
    buf, err := EsReq("POST", url, []byte(deleteJson))
    if err != nil {
        return false, errors.New("请求失败")
    }
    resTotal, err := SourceTotal(buf)
    if err != nil {
        return false, errors.New("解码失败")
    }
    if resTotal == -1 || resTotal == 0{
        result = false
    } else {
        result = true
    }
    return result, nil
}
//按日期范围,服务器Id删除数据
func DeleteAnalyServerDate(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
    url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_delete_by_query"
    deleteJson := `{
    "query":{
        "bool":{
            "filter":[{
                "range":{
                    "updateTime":{
                        "gte":"` + startTime + `",
                        "lte":"` + endTime + `"
                    }
                }
            },
            {
                "term":{
                    "analyServerId":"` + analyServerId + `"
                }
            }
            ]
        }
    }
}    `
    buf, err := EsReq("POST", url, []byte(deleteJson))
    if err != nil {
        return false, errors.New("请求失败")
    }
    deleteRes, err := SourceDeleted(buf)
    if err != nil {
        return false, errors.New("解码失败")
    }
    if deleteRes == -1 {
        result = false
    } else {
        result = true
    }
    return result, nil
}
//给所有节点追加删除任务信息
func AddDelTask(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
    url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
    addJson := `{
    "script": {
        "lang":"painless",
        "inline": "ctx._source.instantTask.add(params.newtask)",
        "params": {
            "newtask": {
                "instantClearId": "` + analyServerId + `",
                "startTime": "` + startTime + `",
                "endTime": "` + endTime + `"
            }
        }
    },
    "query": {
        "match_all": {}
    }
}`
    buf, err := EsReq("POST", url, []byte(addJson))
    if err != nil {
        return false, errors.New("请求失败")
    }
    updateRes, err := SourceUpdated(buf)
    if err != nil {
        return false, errors.New("解码失败")
    }
    if updateRes == -1 {
        result = false
    } else {
        result = true
    }
    return result, nil
}
//移除已执行完的删除任务
func DeleteDelTask(serverIp string, serverPort string, indexName string, analyServerId string) (result bool, err error) {
    url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
    deleteJson := `{
    "script": {
        "lang":"painless",
        "inline": "ctx._source.instantTask.remove(0)"
    },
    "query": {
        "bool": {
            "filter":[{
                "term":{
                    "id":"` + analyServerId + `"
                }
            }]
        }
    }
}`
    buf, err := EsReq("POST", url, []byte(deleteJson))
    if err != nil {
        return false, errors.New("请求失败")
    }
    updateRes, err := SourceUpdated(buf)
    if err != nil {
        return false, errors.New("解码失败")
    }
    if updateRes == -1 {
        result = false
    } else {
        result = true
    }
    return result, nil
}
EsClient.go
@@ -542,6 +542,73 @@
    return data, nil
}
func SourceCreated(buf []byte) (result bool, err error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return false, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["result"].(string)
    if !ok {
        return false, errors.New("first total change error!")
    }
    if middle == "created" || middle == "updated" {
        result = true
    }
    return result, nil
}
func SourceDeleted(buf []byte) (total int, err error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return -1, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["deleted"].(float64)
    if !ok {
        return -1, errors.New("first total change error!")
    }
    total = int(middle)
    return total, nil
}
func SourceUpdated(buf []byte) (total int, err error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return -1, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["updated"].(float64)
    if !ok {
        return -1, errors.New("first total change error!")
    }
    total = int(middle)
    return total, nil
}
func SourceTotal(buf []byte) (total int, err error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return -1, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["hits"].(map[string]interface{})
    if !ok {
        return -1, errors.New("first total change error!")
    }
    tmp := middle["total"].(float64)
    total = int(tmp)
    return total, nil
}
func EsReq(method string, url string, parama []byte) (buf []byte, err error) {
    //defer elapsed("page")()
    timeout := time.Duration(10 * time.Second)