package esutil
|
|
import (
|
"encoding/json"
|
"errors"
|
"fmt"
|
"strconv"
|
"strings"
|
"sync"
|
"time"
|
|
"basic.com/pubsub/protomsg.git"
|
)
|
|
// 根据抓拍人员id查询抓拍人员信息
|
func AIOceaninfosbyid(id []string, indexName string, serverIp string, serverPort string) ([]protomsg.AIOcean, error) {
|
var aIOceanInfo []protomsg.AIOcean
|
videopersonsPersonId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1)
|
var dbinfoRequest = `
|
{
|
"query": {
|
"bool": {
|
"filter": [{
|
"terms": {
|
"id": [
|
"` + videopersonsPersonId + `"
|
]
|
}
|
}]
|
}
|
},
|
"size":1000000
|
}
|
`
|
buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search", []byte(dbinfoRequest))
|
if err != nil {
|
return aIOceanInfo, err
|
}
|
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return aIOceanInfo, err
|
}
|
|
aIOcean := AIOceanAnalysis(sources)
|
return aIOcean, nil
|
}
|
|
//根据抓拍库人员id查询特征值
|
func GetVideoPersonFaceFeatureById(id string, indexName string, serverIp string, serverPort string) (string, error) {
|
var jsonDSL = `
|
{
|
"query": {
|
"bool": {
|
"filter": [{
|
"term": {
|
"id":"` + id + `"
|
}
|
}]
|
}
|
},
|
"_source":["targetInfo.feature"]
|
}
|
`
|
buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search", []byte(jsonDSL))
|
if err != nil {
|
return "", err
|
}
|
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return "", err
|
}
|
|
feature := sources[0]["targetInfo"].([]interface{})[0].(map[string]interface{})["feature"].(string)
|
return feature, nil
|
}
|
|
//根据目标id查询已追加条数
|
func GetLinkTagInfoSize(id string, indexName string, serverIp string, serverPort string) (size int, err error) {
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
|
queryDSL := `{
|
"query": {
|
"term":{
|
"id":"` + id + `"
|
}
|
}
|
}`
|
buf, err := EsReq("POST", url, []byte(queryDSL))
|
if err != nil {
|
return -1, err
|
}
|
source, err := Sourcelist(buf)
|
if err != nil {
|
return -1, err
|
}
|
if source[0]["linkTagInfo"] != nil {
|
size = len(source[0]["linkTagInfo"].([]interface{}))
|
} else {
|
return -1, errors.New("该数组不存在")
|
}
|
return size, nil
|
}
|
|
//根据目标id追加跟踪信息
|
func AppendTargetInfo(id string, targetInfo string, indexName string, serverIp string, serverPort string, updateTime string) (string, error) {
|
if targetInfo == "" {
|
return "", errors.New("append data is nil")
|
}
|
var info interface{}
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query?refresh=true"
|
jsonDSL := `{
|
"query": {
|
"term":{
|
"id":"` + id + `"
|
}
|
},
|
"script": {
|
"lang": "painless",
|
"inline": "ctx._source.linkTagInfo.add(params.newparam);ctx._source.updateTime='` + updateTime + `'",
|
"params": {
|
"newparam": ` + targetInfo + `
|
}
|
}
|
}`
|
fmt.Println(jsonDSL)
|
buf, err := EsReq("POST", url, []byte(jsonDSL))
|
if err != nil {
|
return "", err
|
}
|
json.Unmarshal(buf, &info)
|
out, ok := info.(map[string]interface{})
|
fmt.Println(out)
|
if !ok {
|
return "", errors.New("http response interface can not change map[string]interface{}")
|
}
|
middle, ok := out["updated"].(float64)
|
if !ok {
|
return "", errors.New("first updated change error!")
|
}
|
mes := ""
|
if middle == 1 {
|
mes = "追加成功"
|
}
|
if middle == 0 {
|
mes = "已经追加"
|
}
|
return mes, nil
|
|
}
|
|
//根据抓拍人员id更新(videourl)摄像机地址
|
func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int, err error) {
|
|
var info interface{}
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query?refresh=true"
|
sourceStr := "ctx._source.videoUrl='" + videoUrl + "'"
|
if command >= 0 {
|
sourceStr = "ctx._source.linkTagInfo[" + strconv.Itoa(command) + "].videoUrl='" + videoUrl + "'"
|
}
|
var videoUrlInfo = `
|
{
|
"script": {
|
"source": "` + sourceStr + `"
|
},
|
"query": {
|
"term": {
|
"id": "` + id + `"
|
}
|
}
|
}
|
`
|
//fmt.Println("url: ", url, videoUrlInfo)
|
buf, err := EsReq("POST", url, []byte(videoUrlInfo))
|
if err != nil {
|
fmt.Println("http request videoUrlInfo info is err!")
|
statu = 500
|
return statu, err
|
}
|
json.Unmarshal(buf, &info)
|
//fmt.Println(info)
|
out, ok := info.(map[string]interface{})
|
if !ok {
|
fmt.Println("http response interface can not change map[string]interface{}")
|
statu = 500
|
return statu, errors.New("http response interface can not change map[string]interface{}")
|
}
|
middle, ok := out["updated"].(float64)
|
if !ok {
|
fmt.Println("first updated change error!")
|
statu = 500
|
return statu, errors.New("first updated change error!")
|
}
|
if middle == 1 {
|
statu = 200
|
return statu, nil
|
}
|
if middle == 0 {
|
statu = 201
|
return statu, errors.New("已经修改")
|
}
|
return statu, nil
|
}
|
|
//获取当前节点抓拍库所有人员ID*缓存*
|
func GetAllLocalVideopersonsId(compareArgs protomsg.CompareArgs, indexName string, serverIp string, serverPort string, alarmLevelTypes string) (capturetable []string) {
|
queryStr := ""
|
queryBody := compareArgs.InputValue
|
//检索框
|
if queryBody != "" {
|
queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"cameraAddr^1.5\",\"taskName^1.5\",\"sdkName^1.5\",\"showLabels^3.0\",\"baseInfo.tableName^1.5\",\"baseInfo.targetName^1.5\",\"baseInfo.labels^1.5\",\"alarmRules.alarmLevel^1.5\",\"linkTag^1.5\"]," +
|
"\"type\":\"cross_fields\",\"operator\":\"OR\",\"slop\":0,\"prefix_length\":0,\"max_expansions\":50,\"zero_terms_query\":\"NONE\",\"auto_generate_synonyms_phrase_query\":true,\"fuzzy_transpositions\":true,\"boost\":1}}],"
|
}
|
if compareArgs.SearchTime == nil || len(compareArgs.SearchTime) != 2 {
|
return nil
|
}
|
gteDate := compareArgs.SearchTime[0]
|
lteDate := compareArgs.SearchTime[1]
|
//判断任务ID
|
taskIdStr := ""
|
taskId := compareArgs.Tasks
|
if taskId != nil && len(taskId) > 0 {
|
esTaskId := strings.Replace(strings.Trim(fmt.Sprint(taskId), "[]"), " ", "\",\"", -1)
|
taskIdStr = "{\"terms\":{\"taskId\":[\"" + esTaskId + "\"]}},"
|
}
|
//判断摄像机ID
|
cameraIdStr := ""
|
cameraId := compareArgs.TreeNodes
|
if cameraId != nil && len(cameraId) > 0 {
|
esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1)
|
cameraIdStr = "{\"terms\":{\"cameraId\":[\"" + esCameraId + "\"]}},"
|
}
|
|
//判断库表ID
|
tableId := compareArgs.Tabs
|
esTableId := ""
|
esTableIdStr := ""
|
if tableId != nil && len(tableId) > 0 {
|
esTableId = strings.Replace(strings.Trim(fmt.Sprint(tableId), "[]"), " ", "\",\"", -1)
|
esTableIdStr = "{\"terms\":{\"baseInfo.tableId\":[\"" + esTableId + "\"]}},"
|
}
|
isCollectStr := ""
|
isCollect := compareArgs.Collection
|
if isCollect != "" {
|
isCollectStr = "{\"term\":{\"isCollect\":\"" + isCollect + "\"}},"
|
}
|
|
//判断布防等级
|
alarmLevelStr := ""
|
if alarmLevelTypes != "" {
|
alarmLevelStr = "{\"terms\":{\"alarmRules.alarmLevel.raw\":[\"" + alarmLevelTypes + "\"]}},"
|
}
|
|
//使用es底层机制处理分页
|
|
analyServerFilterStr := ""
|
analyServerId := compareArgs.AnalyServerId
|
if analyServerId != "" {
|
analyServerFilterStr = "{\"term\":{\"analyServerId\":\"" + analyServerId + "\"}},"
|
}
|
|
ts := time.Now()
|
//首次请求头
|
url := "http://" + serverIp + ":" + serverPort +
|
"/" + indexName + "/_search?search_type=dfs_query_then_fetch;scroll=1m"
|
|
var lock sync.RWMutex
|
var wg sync.WaitGroup
|
|
for i := 0; i < 48; i++ {
|
//请求体
|
prama := "{" +
|
"\"slice\":{\"id\":" + strconv.Itoa(i) + ",\"max\":48}," +
|
"\"size\":\"1000\"," +
|
"\"query\":{\"bool\":{" + queryStr +
|
"\"filter\":[" +
|
"{\"term\":{\"targetInfo.targetType.raw\":\"face\"}}," +
|
cameraIdStr +
|
alarmLevelStr +
|
taskIdStr +
|
isCollectStr +
|
esTableIdStr +
|
analyServerFilterStr +
|
"{\"range\":{\"picDate\":{\"from\":\"" + gteDate + "\",\"to\":\"" + lteDate + "\",\"include_lower\":true,\"include_upper\":true,\"boost\":1}}}]}}," +
|
"\"_source\":[\"id\"]" +
|
"}"
|
wg.Add(1)
|
go func(reqParam string) {
|
defer wg.Done()
|
|
//fmt.Println(url)
|
//fmt.Println(prama)
|
buf, err := EsReq("POST", url, []byte(reqParam))
|
|
if err != nil {
|
fmt.Println("http request videoUrlInfo info is err!")
|
fmt.Println(len(capturetable))
|
return
|
}
|
|
sources, err := Sourcelistforscroll(buf)
|
|
if err != nil {
|
fmt.Println(len(capturetable))
|
return
|
}
|
for _, source := range sources["sourcelist"].([]map[string]interface{}) {
|
lock.Lock()
|
capturetable = append(capturetable, source["id"].(string))
|
lock.Unlock()
|
}
|
|
scroll_id := sources["scroll_id"].(string)
|
|
//scroll请求头
|
scroll_url := "http://" + serverIp + ":" + serverPort + "/_search/scroll"
|
|
for {
|
var tmpList []string
|
next_scroll_id := ""
|
if next_scroll_id != "" {
|
scroll_id = next_scroll_id
|
}
|
jsonDSL := `{
|
"scroll": "1m",
|
"scroll_id" : "` + scroll_id + `"
|
}`
|
//fmt.Println(scroll_url)
|
//fmt.Println(jsonDSL)
|
buf, err := EsReq("POST", scroll_url, []byte(jsonDSL))
|
|
if err != nil {
|
fmt.Println("lenth1: ", len(capturetable))
|
return
|
}
|
nextSources, err := Sourcelistforscroll(buf)
|
|
if nextSources == nil {
|
return
|
}
|
|
nextM := nextSources["sourcelist"].([]map[string]interface{})
|
//fmt.Println("id",nextSources)
|
if nextM == nil || len(nextM) == 0 {
|
//fmt.Println("lenth: ", len(capturetable))
|
return
|
}
|
//fmt.Println("id")
|
for _, source := range nextM {
|
tmpList = append(tmpList, source["id"].(string))
|
}
|
//fmt.Println("tmpList: ", len(tmpList))
|
lock.Lock()
|
capturetable = append(capturetable, tmpList...)
|
lock.Unlock()
|
|
next_scroll_id = nextSources["scroll_id"].(string)
|
}
|
|
}(prama)
|
}
|
wg.Wait()
|
|
fmt.Println("lenth_all: ", len(capturetable))
|
fmt.Println("耗时:", time.Since(ts))
|
return capturetable
|
}
|
|
//初始化实时抓拍
|
func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm string, category string, quantity int) ([]protomsg.AIOcean, error) {
|
var aIOceanInfo []protomsg.AIOcean
|
url := "http://" + serverIp + ":" + serverPort +
|
"/" + indexName + "/_search"
|
var filterArr []string
|
if isAlarm != "all" {
|
filterArr = append(filterArr, ` {
|
"term":{
|
"isAlarm":"`+isAlarm+`"
|
}
|
}`)
|
}
|
|
if category != "all" {
|
filterArr = append(filterArr, ` {
|
"term":{
|
"targetInfo.targetType":"`+category+`"
|
}
|
}`)
|
|
}
|
|
queryStr := `"query":{
|
"bool":{
|
"filter":[
|
` + strings.Join(filterArr, ",") + `
|
]
|
}
|
},`
|
|
DSLJson := `{
|
"size":` + strconv.Itoa(quantity) + `,
|
` + queryStr + `
|
"sort":[{"picDate":{"order":"desc"}}],
|
"_source": {"includes":[],"excludes":["*.feature"]}
|
}`
|
fmt.Println(DSLJson)
|
buf, err := EsReq("POST", url, []byte(DSLJson))
|
if err != nil {
|
return aIOceanInfo, err
|
}
|
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return aIOceanInfo, err
|
}
|
|
aIOcean := AIOceanAnalysis(sources)
|
//fmt.Println(len(videoperson))
|
return aIOcean, nil
|
}
|
|
//实时抓拍
|
func RealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool) ([]protomsg.AIOcean, error) {
|
var aIOceanInfo []protomsg.AIOcean
|
url := "http://" + serverIp + ":" + serverPort +
|
"/" + indexName + "/_search"
|
DSLJson := `{
|
"size":20,
|
"query":{
|
"bool":{
|
"filter":[
|
{
|
"range":{
|
"picDate":{
|
"gte":"now+8h-30s",
|
"lt":"now+8h"
|
}
|
}
|
},
|
{
|
"term":{
|
"isAlarm":` + strconv.FormatBool(isAlarm) + `
|
}
|
}
|
]
|
}
|
},
|
"_source": {"includes":[],"excludes":["*.feature"]}
|
}`
|
|
buf, err := EsReq("POST", url, []byte(DSLJson))
|
if err != nil {
|
return aIOceanInfo, err
|
}
|
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return aIOceanInfo, err
|
}
|
|
aIOcean := AIOceanAnalysis(sources)
|
fmt.Println(len(aIOcean))
|
return aIOcean, nil
|
}
|
|
//综合统计
|
func StatisticsComprehensive(serverIp string, serverPort string, indexName string, isAlarm string) (total int, err error) {
|
url := "http://" + serverIp + ":" + serverPort +
|
"/" + indexName + "/_search"
|
isAlarmStr := ""
|
if isAlarm != "all" {
|
isAlarmStr = ` {
|
"term":{
|
"isAlarm":"` + isAlarm + `"
|
}
|
},`
|
|
}
|
DSLJson := `{
|
"size":0,
|
"query":{
|
"bool":{
|
"filter":[
|
` + isAlarmStr + `
|
{
|
"range":{
|
"picDate":{
|
"gte":"now+8h/d"
|
}
|
}
|
}
|
]
|
}
|
}
|
}`
|
//fmt.Println(DSLJson)
|
buf, err := EsReq("POST", url, []byte(DSLJson))
|
if err != nil {
|
return total, err
|
}
|
var info interface{}
|
json.Unmarshal(buf, &info)
|
out, ok := info.(map[string]interface{})
|
if !ok {
|
return total, errors.New("http response interface can not change map[string]interface{}")
|
}
|
middle, ok := out["hits"].(map[string]interface{})
|
if !ok {
|
return total, errors.New("first hits change error!")
|
}
|
total = int(middle["total"].(float64))
|
//fmt.Println(total)
|
return total, nil
|
}
|
|
//实时报警任务比率
|
func RealTimeAlarmTaskRate(serverIp string, serverPort string, indexName string) (sources []map[string]interface{}, err error) {
|
url := "http://" + serverIp + ":" + serverPort +
|
"/" + indexName + "/_search"
|
DSLJson := `{
|
"size":0,
|
"query":{
|
"bool":{
|
"filter":[{
|
"range":{
|
"picDate":{
|
"gte":"now+8h/d"
|
}
|
}
|
}]
|
}
|
},
|
"aggs":{
|
"sdkName_status":{
|
"terms":{
|
"field":"sdkName.raw"
|
}
|
}
|
}
|
}`
|
buf, err := EsReq("POST", url, []byte(DSLJson))
|
if err != nil {
|
return nil, err
|
}
|
var info interface{}
|
json.Unmarshal(buf, &info)
|
out, ok := info.(map[string]interface{})
|
if !ok {
|
return nil, errors.New("http response interface can not change map[string]interface{}")
|
}
|
middle, ok := out["aggregations"].(map[string]interface{})
|
if !ok {
|
return nil, errors.New("first hits change error!")
|
}
|
sdkName_status, ok := middle["sdkName_status"].(map[string]interface{})
|
if !ok {
|
return nil, errors.New("first hits change error!")
|
}
|
|
for _, in := range sdkName_status["buckets"].([]interface{}) {
|
var source = make(map[string]interface{}, 0)
|
tmpbuf, ok := in.(map[string]interface{})
|
if !ok {
|
fmt.Println("change to source error!")
|
continue
|
}
|
sdkName := tmpbuf["key"].(string)
|
count := int(tmpbuf["doc_count"].(float64))
|
source["name"] = sdkName
|
source["value"] = count
|
sources = append(sources, source)
|
}
|
//fmt.Println("tmpSource",sources)
|
return sources, nil
|
}
|
|
//聚合任务列表,taskId+taskName
|
func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string) (sources []map[string]interface{}, err error) {
|
url := "http://" + serverIp + ":" + serverPort +
|
"/" + indexName + "/_search"
|
serverFilterStr := ""
|
if analyServerId != "" {
|
serverFilterStr = `,
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"term": {
|
"analyServerId": "` + analyServerId + `"
|
}
|
}
|
]
|
}
|
}`
|
}
|
DSLJson := `{
|
"size": 0,
|
"aggs": {
|
"task_status": {
|
"composite": {
|
"sources": [
|
{
|
"taskId": {
|
"terms": {
|
"field": "taskId"
|
}
|
}
|
},
|
{
|
"taskName": {
|
"terms": {
|
"field": "taskName.raw"
|
}
|
}
|
}
|
],
|
"size":"1000000"
|
}
|
}
|
}
|
` + serverFilterStr + `
|
}`
|
buf, err := EsReq("POST", url, []byte(DSLJson))
|
if err != nil {
|
return nil, err
|
}
|
var info interface{}
|
json.Unmarshal(buf, &info)
|
out, ok := info.(map[string]interface{})
|
if !ok {
|
return nil, errors.New("http response interface can not change map[string]interface{}")
|
}
|
middle, ok := out["aggregations"].(map[string]interface{})
|
if !ok {
|
return nil, errors.New("first hits change error!")
|
}
|
task_status, ok := middle["task_status"].(map[string]interface{})
|
if !ok {
|
return nil, errors.New("first hits change error!")
|
}
|
|
for _, in := range task_status["buckets"].([]interface{}) {
|
var source = make(map[string]interface{}, 0)
|
tmpbuf, ok := in.(map[string]interface{})
|
if !ok {
|
fmt.Println("change to source error!")
|
continue
|
}
|
task := tmpbuf["key"].(map[string]interface{})
|
count := int(tmpbuf["doc_count"].(float64))
|
taskName := task["taskName"].(string)
|
taskId := task["taskId"].(string)
|
source["taskName"] = taskName
|
source["taskId"] = taskId
|
source["count"] = count
|
sources = append(sources, source)
|
}
|
//fmt.Println("tmpSource",sources)
|
return sources, nil
|
|
}
|
|
//添加即将删除信号
|
func AddDeleteSignal() {
|
|
}
|
|
/****************************************以下为sdkCompare比对缓存使用方法*********************************************/
|
//获取查询总数 *缓存*
|
func GetTotal(serverIp string, serverPort string, indexName string, shards string, targetType string) (total int) {
|
JsonDSL := `{
|
"size": 0,
|
"query": {
|
"bool": {
|
"filter": [{
|
"term": {
|
"targetInfo.targetType.raw": "` + targetType + `"
|
}
|
}]
|
}
|
}
|
}`
|
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
|
buf, err := EsReq("POST", url, []byte(JsonDSL))
|
if err != nil {
|
return
|
}
|
var info interface{}
|
json.Unmarshal(buf, &info)
|
out, ok := info.(map[string]interface{})
|
if !ok {
|
return
|
}
|
middle, ok := out["hits"].(map[string]interface{})
|
if !ok {
|
return
|
}
|
total = int(middle["total"].(float64))
|
return total
|
|
}
|
|
//查询时间段数据 *缓存*
|
func GetPeriodInfos(serverIp string, serverPort string, startTime string, endTime string, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) {
|
var capdbinfo []*protomsg.MultiFeaCache
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
|
var source []string
|
switch targetType {
|
case "face":
|
source = []string{"id", "targetInfo.feature", "analyServerId", "cameraId"}
|
case "track":
|
source = []string{"id", "targetInfo.feature", "analyServerId", "cameraId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.cameraId", "linkTagInfo.targetInfo.targetLocation"}
|
}
|
JsonDSL := `
|
{
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"term": {
|
"targetInfo.targetType.raw": "` + targetType + `"
|
}
|
},
|
{
|
"range": {
|
"picDate": {
|
"gte": "` + startTime + `",
|
"lt": "` + endTime + `"
|
}
|
}
|
}
|
]
|
}
|
},
|
"size": 1000000,
|
"_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"]
|
}
|
`
|
//logger.Debug(url)
|
//logger.Debug(JsonDSL)
|
//fmt.Println(JsonDSL)
|
buf, err := EsReq("POST", url, []byte(JsonDSL))
|
if err != nil {
|
return capdbinfo, errors.New("http request dbtablename info is err!")
|
}
|
|
// 返回 _source 数组
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return capdbinfo, err
|
}
|
//fmt.Println(sources)
|
// 返回所有查询的数据
|
capdbinfos := Parsesources(sources)
|
return capdbinfos, nil
|
}
|
|
// 查询底库人员信息*缓存*
|
func GetOceanFeatures(serverIp string, serverPort string, queryIndexNum int, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) {
|
var dbinfos []*protomsg.MultiFeaCache
|
point := strconv.Itoa(queryIndexNum)
|
number := strconv.Itoa(queryNums)
|
JsonDSL := ""
|
var source []string
|
switch targetType {
|
case "face":
|
source = []string{"id", "targetInfo.feature", "analyServerId"}
|
case "track":
|
source = []string{"id", "targetInfo.feature", "analyServerId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.targetInfo.targetLocation"}
|
}
|
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
|
JsonDSL = ` {
|
"from": ` + point + `,
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"term": {
|
"targetInfo.targetType.raw": "` + targetType + `"
|
}
|
}
|
]
|
}
|
},
|
"size":` + number + `,
|
"_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"]
|
}`
|
|
buf, err := EsReq("POST", url, []byte(JsonDSL))
|
if err != nil {
|
return dbinfos, errors.New("http request dbtablename info is err!")
|
}
|
|
// 返回 _source 数组
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return dbinfos, err
|
}
|
|
// 返回所有查询的数据
|
dbpersoninfos := Parsesources(sources)
|
return dbpersoninfos, nil
|
}
|
|
//************************CORN TASK*******************************
|
//查询日期范围内是否还存在数据
|
func QueryAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
|
deleteJson := `{
|
"query":{
|
"bool":{
|
"filter":[{
|
"range":{
|
"updateTime":{
|
"gte":"` + startTime + `",
|
"lte":"` + endTime + `"
|
}
|
}
|
},
|
{
|
"term":{
|
"analyServerId":"` + analyServerId + `"
|
}
|
}
|
]
|
}
|
}
|
} `
|
buf, err := EsReq("POST", url, []byte(deleteJson))
|
if err != nil {
|
return false, errors.New("请求失败")
|
}
|
resTotal, err := SourceTotal(buf)
|
if err != nil {
|
return false, errors.New("解码失败")
|
}
|
if resTotal == -1 || resTotal == 0{
|
result = false
|
} else {
|
result = true
|
}
|
return result, nil
|
}
|
|
|
//按日期范围,服务器Id删除数据
|
func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_delete_by_query"
|
deleteJson := `{
|
"query":{
|
"bool":{
|
"filter":[{
|
"range":{
|
"updateTime":{
|
"gte":"` + startTime + `",
|
"lte":"` + endTime + `"
|
}
|
}
|
},
|
{
|
"term":{
|
"analyServerId":"` + analyServerId + `"
|
}
|
}
|
]
|
}
|
}
|
} `
|
buf, err := EsReq("POST", url, []byte(deleteJson))
|
if err != nil {
|
return false, errors.New("请求失败")
|
}
|
deleteRes, err := SourceDeleted(buf)
|
if err != nil {
|
return false, errors.New("解码失败")
|
}
|
if deleteRes == -1 {
|
result = false
|
} else {
|
result = true
|
}
|
return result, nil
|
}
|
|
//给所有节点追加删除任务信息
|
func AddDelTask(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
|
addJson := `{
|
"script": {
|
"lang":"painless",
|
"inline": "ctx._source.instantTask.add(params.newtask)",
|
"params": {
|
"newtask": {
|
"instantClearId": "` + analyServerId + `",
|
"startTime": "` + startTime + `",
|
"endTime": "` + endTime + `"
|
}
|
}
|
},
|
"query": {
|
"match_all": {}
|
}
|
}`
|
buf, err := EsReq("POST", url, []byte(addJson))
|
if err != nil {
|
return false, errors.New("请求失败")
|
}
|
updateRes, err := SourceUpdated(buf)
|
if err != nil {
|
return false, errors.New("解码失败")
|
}
|
if updateRes == -1 {
|
result = false
|
} else {
|
result = true
|
}
|
return result, nil
|
}
|
|
//移除已执行完的删除任务
|
func DeleteDelTask(serverIp string, serverPort string, indexName string, analyServerId string) (result bool, err error) {
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
|
deleteJson := `{
|
"script": {
|
"lang":"painless",
|
"inline": "ctx._source.instantTask.remove(0)"
|
},
|
"query": {
|
"bool": {
|
"filter":[{
|
"term":{
|
"id":"` + analyServerId + `"
|
}
|
}]
|
}
|
}
|
}`
|
buf, err := EsReq("POST", url, []byte(deleteJson))
|
if err != nil {
|
return false, errors.New("请求失败")
|
}
|
updateRes, err := SourceUpdated(buf)
|
if err != nil {
|
return false, errors.New("解码失败")
|
}
|
if updateRes == -1 {
|
result = false
|
} else {
|
result = true
|
}
|
return result, nil
|
}
|