| | |
| | | } |
| | | |
| | | //根据目标id查询已追加条数 |
| | | func GetLinkTagInfoSize(id string, indexName string, serverIp string, serverPort string) (size int,err error){ |
| | | func GetLinkTagInfoSize(id string, indexName string, serverIp string, serverPort string) (size int, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | queryDSL := `{ |
| | | "query": { |
| | |
| | | } |
| | | } |
| | | }` |
| | | buf, err := EsReq("POST",url,[]byte(queryDSL)) |
| | | buf, err := EsReq("POST", url, []byte(queryDSL)) |
| | | if err != nil { |
| | | return -1,err |
| | | return -1, err |
| | | } |
| | | source, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return -1,err |
| | | return -1, err |
| | | } |
| | | if source[0]["linkTagInfo"] != nil { |
| | | size = len(source[0]["linkTagInfo"].([]interface{})) |
| | | } else { |
| | | return -1,errors.New("该数组不存在") |
| | | return -1, errors.New("该数组不存在") |
| | | } |
| | | return size,nil |
| | | return size, nil |
| | | } |
| | | |
| | | //根据目标id追加跟踪信息 |
| | |
| | | }, |
| | | "script": { |
| | | "lang": "painless", |
| | | "inline": "ctx._source.linkTagInfo.add(params.newparam);ctx._source.updateTime='`+updateTime+`'", |
| | | "inline": "ctx._source.linkTagInfo.add(params.newparam);ctx._source.updateTime='` + updateTime + `'", |
| | | "params": { |
| | | "newparam": ` + targetInfo + ` |
| | | } |
| | |
| | | } |
| | | |
| | | //根据抓拍人员id更新(videourl)摄像机地址 |
| | | func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int,err error) { |
| | | func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int, err error) { |
| | | |
| | | var info interface{} |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query?refresh=true" |
| | |
| | | if err != nil { |
| | | fmt.Println("http request videoUrlInfo info is err!") |
| | | statu = 500 |
| | | return statu,err |
| | | return statu, err |
| | | } |
| | | json.Unmarshal(buf, &info) |
| | | //fmt.Println(info) |
| | |
| | | if !ok { |
| | | fmt.Println("http response interface can not change map[string]interface{}") |
| | | statu = 500 |
| | | return statu,errors.New("http response interface can not change map[string]interface{}") |
| | | return statu, errors.New("http response interface can not change map[string]interface{}") |
| | | } |
| | | middle, ok := out["updated"].(float64) |
| | | if !ok { |
| | | fmt.Println("first updated change error!") |
| | | statu = 500 |
| | | return statu,errors.New("first updated change error!") |
| | | return statu, errors.New("first updated change error!") |
| | | } |
| | | if middle == 1 { |
| | | statu = 200 |
| | | return statu,nil |
| | | return statu, nil |
| | | } |
| | | if middle == 0 { |
| | | statu = 201 |
| | | return statu,errors.New("已经修改") |
| | | return statu, errors.New("已经修改") |
| | | } |
| | | return statu,nil |
| | | return statu, nil |
| | | } |
| | | |
| | | //获取当前节点抓拍库所有人员ID*缓存* |
| | |
| | | |
| | | } |
| | | |
| | | //添加即将删除信号 |
| | | func AddDeleteSignal() { |
| | | |
| | | } |
| | | |
| | | /****************************************以下为sdkCompare比对缓存使用方法*********************************************/ |
| | | //获取查询总数 *缓存* |
| | | func GetTotal(serverIp string, serverPort string, indexName string, shards string, targetType string) (total int) { |
| | |
| | | "bool": { |
| | | "filter": [{ |
| | | "term": { |
| | | "targetInfo.targetType.raw": "`+targetType+`" |
| | | "targetInfo.targetType.raw": "` + targetType + `" |
| | | } |
| | | }] |
| | | } |
| | |
| | | var source []string |
| | | switch targetType { |
| | | case "face": |
| | | source = []string{"id","targetInfo.feature","analyServerId","cameraId"} |
| | | source = []string{"id", "targetInfo.feature", "analyServerId", "cameraId"} |
| | | case "track": |
| | | source = []string{"id","targetInfo.feature","analyServerId","cameraId","targetInfo.attachTarget.feature","targetInfo.targetLocation","linkTagInfo.targetInfo.feature","linkTagInfo.targetInfo.attachTarget.feature","linkTagInfo.cameraId","linkTagInfo.targetInfo.targetLocation"} |
| | | source = []string{"id", "targetInfo.feature", "analyServerId", "cameraId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.cameraId", "linkTagInfo.targetInfo.targetLocation"} |
| | | } |
| | | JsonDSL := ` |
| | | { |
| | |
| | | "filter": [ |
| | | { |
| | | "term": { |
| | | "targetInfo.targetType.raw": "`+targetType+`" |
| | | "targetInfo.targetType.raw": "` + targetType + `" |
| | | } |
| | | }, |
| | | { |
| | |
| | | } |
| | | }, |
| | | "size": 1000000, |
| | | "_source": ["`+strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1)+`"] |
| | | "_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"] |
| | | } |
| | | ` |
| | | //logger.Debug(url) |
| | |
| | | var source []string |
| | | switch targetType { |
| | | case "face": |
| | | source = []string{"id","targetInfo.feature","analyServerId"} |
| | | source = []string{"id", "targetInfo.feature", "analyServerId"} |
| | | case "track": |
| | | source = []string{"id","targetInfo.feature","analyServerId","targetInfo.attachTarget.feature","targetInfo.targetLocation","linkTagInfo.targetInfo.feature","linkTagInfo.targetInfo.attachTarget.feature","linkTagInfo.targetInfo.targetLocation"} |
| | | source = []string{"id", "targetInfo.feature", "analyServerId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.targetInfo.targetLocation"} |
| | | } |
| | | |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local" |
| | |
| | | "filter": [ |
| | | { |
| | | "term": { |
| | | "targetInfo.targetType.raw": "`+targetType+`" |
| | | "targetInfo.targetType.raw": "` + targetType + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "size":` + number + `, |
| | | "_source": ["`+strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1)+`"] |
| | | "_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"] |
| | | }` |
| | | |
| | | buf, err := EsReq("POST", url, []byte(JsonDSL)) |
| | |
| | | dbpersoninfos := Parsesources(sources) |
| | | return dbpersoninfos, nil |
| | | } |
| | | |
| | | //************************CORN TASK******************************* |
| | | //查询日期范围内是否还存在数据 |
| | | func QueryAnalyServerDate(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | deleteJson := `{ |
| | | "query":{ |
| | | "bool":{ |
| | | "filter":[{ |
| | | "range":{ |
| | | "updateTime":{ |
| | | "gte":"` + startTime + `", |
| | | "lte":"` + endTime + `" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "term":{ |
| | | "analyServerId":"` + analyServerId + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | } |
| | | } ` |
| | | buf, err := EsReq("POST", url, []byte(deleteJson)) |
| | | if err != nil { |
| | | return false, errors.New("请求失败") |
| | | } |
| | | resTotal, err := SourceTotal(buf) |
| | | if err != nil { |
| | | return false, errors.New("解码失败") |
| | | } |
| | | if resTotal == -1 || resTotal == 0{ |
| | | result = false |
| | | } else { |
| | | result = true |
| | | } |
| | | return result, nil |
| | | } |
| | | |
| | | |
| | | //按日期范围,服务器Id删除数据 |
| | | func DeleteAnalyServerDate(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_delete_by_query" |
| | | deleteJson := `{ |
| | | "query":{ |
| | | "bool":{ |
| | | "filter":[{ |
| | | "range":{ |
| | | "updateTime":{ |
| | | "gte":"` + startTime + `", |
| | | "lte":"` + endTime + `" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "term":{ |
| | | "analyServerId":"` + analyServerId + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | } |
| | | } ` |
| | | buf, err := EsReq("POST", url, []byte(deleteJson)) |
| | | if err != nil { |
| | | return false, errors.New("请求失败") |
| | | } |
| | | deleteRes, err := SourceDeleted(buf) |
| | | if err != nil { |
| | | return false, errors.New("解码失败") |
| | | } |
| | | if deleteRes == -1 { |
| | | result = false |
| | | } else { |
| | | result = true |
| | | } |
| | | return result, nil |
| | | } |
| | | |
| | | //给所有节点追加删除任务信息 |
| | | func AddDelTask(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query" |
| | | addJson := `{ |
| | | "script": { |
| | | "lang":"painless", |
| | | "inline": "ctx._source.instantTask.add(params.newtask)", |
| | | "params": { |
| | | "newtask": { |
| | | "instantClearId": "` + analyServerId + `", |
| | | "startTime": "` + startTime + `", |
| | | "endTime": "` + endTime + `" |
| | | } |
| | | } |
| | | }, |
| | | "query": { |
| | | "match_all": {} |
| | | } |
| | | }` |
| | | buf, err := EsReq("POST", url, []byte(addJson)) |
| | | if err != nil { |
| | | return false, errors.New("请求失败") |
| | | } |
| | | updateRes, err := SourceUpdated(buf) |
| | | if err != nil { |
| | | return false, errors.New("解码失败") |
| | | } |
| | | if updateRes == -1 { |
| | | result = false |
| | | } else { |
| | | result = true |
| | | } |
| | | return result, nil |
| | | } |
| | | |
| | | //移除已执行完的删除任务 |
| | | func DeleteDelTask(serverIp string, serverPort string, indexName string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query" |
| | | deleteJson := `{ |
| | | "script": { |
| | | "lang":"painless", |
| | | "inline": "ctx._source.instantTask.remove(0)" |
| | | }, |
| | | "query": { |
| | | "bool": { |
| | | "filter":[{ |
| | | "term":{ |
| | | "id":"` + analyServerId + `" |
| | | } |
| | | }] |
| | | } |
| | | } |
| | | }` |
| | | buf, err := EsReq("POST", url, []byte(deleteJson)) |
| | | if err != nil { |
| | | return false, errors.New("请求失败") |
| | | } |
| | | updateRes, err := SourceUpdated(buf) |
| | | if err != nil { |
| | | return false, errors.New("解码失败") |
| | | } |
| | | if updateRes == -1 { |
| | | result = false |
| | | } else { |
| | | result = true |
| | | } |
| | | return result, nil |
| | | } |