package esutil
|
|
import (
|
"basic.com/pubsub/protomsg.git"
|
"encoding/json"
|
"errors"
|
"fmt"
|
"sort"
|
"strconv"
|
"strings"
|
"sync"
|
"time"
|
)
|
|
var logPrint = func(i ...interface{}) {
|
fmt.Println(i)
|
}
|
|
func InitLog(fn func(i ...interface{})) {
|
if fn != nil {
|
logPrint = fn
|
}
|
}
|
|
// 根据抓拍人员id查询抓拍人员信息
|
func AIOceaninfosbyid(id []string, indexName string, serverIp string, serverPort string) ([]protomsg.AIOcean, error) {
|
var aIOceanInfo []protomsg.AIOcean
|
videopersonsPersonId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1)
|
var dbinfoRequest = `
|
{
|
"query": {
|
"bool": {
|
"filter": [{
|
"terms": {
|
"id": [
|
"` + videopersonsPersonId + `"
|
]
|
}
|
}]
|
}
|
},
|
"size":1000000
|
}
|
`
|
buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search", []byte(dbinfoRequest))
|
if err != nil {
|
return aIOceanInfo, err
|
}
|
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return aIOceanInfo, err
|
}
|
|
aIOcean := AIOceanAnalysis(sources)
|
return aIOcean, nil
|
}
|
|
//根据抓拍库人员id查询特征值
|
func GetVideoPersonFaceFeatureById(id string, indexName string, serverIp string, serverPort string) (string, error) {
|
var jsonDSL = `
|
{
|
"query": {
|
"bool": {
|
"filter": [{
|
"term": {
|
"id":"` + id + `"
|
}
|
}]
|
}
|
},
|
"_source":["targetInfo.feature"]
|
}
|
`
|
buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search", []byte(jsonDSL))
|
if err != nil {
|
return "", err
|
}
|
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return "", err
|
}
|
|
feature := sources[0]["targetInfo"].([]interface{})[0].(map[string]interface{})["feature"].(string)
|
return feature, nil
|
}
|
|
//根据目标id查询已追加条数
|
func GetLinkTagInfoSize(id string, indexName string, serverIp string, serverPort string) (size int, err error) {
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
|
queryDSL := `{
|
"query": {
|
"term":{
|
"id":"` + id + `"
|
}
|
}
|
}`
|
buf, err := EsReq("POST", url, []byte(queryDSL))
|
if err != nil {
|
return -1, err
|
}
|
source, err := Sourcelist(buf)
|
if err != nil {
|
return -1, err
|
}
|
if source[0]["linkTagInfo"] != nil {
|
size = len(source[0]["linkTagInfo"].([]interface{}))
|
} else {
|
return -1, errors.New("该数组不存在")
|
}
|
return size, nil
|
}
|
|
//根据目标id追加跟踪信息
|
func AppendTargetInfo(id string, targetInfo string, indexName string, serverIp string, serverPort string, updateTime string) (string, error) {
|
if targetInfo == "" {
|
return "", errors.New("append data is nil")
|
}
|
var info interface{}
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query?refresh=true"
|
jsonDSL := `{
|
"query": {
|
"term":{
|
"id":"` + id + `"
|
}
|
},
|
"script": {
|
"lang": "painless",
|
"inline": "ctx._source.linkTagInfo.add(params.newparam);ctx._source.updateTime='` + updateTime + `'",
|
"params": {
|
"newparam": ` + targetInfo + `
|
}
|
}
|
}`
|
logPrint(jsonDSL)
|
buf, err := EsReq("POST", url, []byte(jsonDSL))
|
if err != nil {
|
return "", err
|
}
|
json.Unmarshal(buf, &info)
|
out, ok := info.(map[string]interface{})
|
logPrint(out)
|
if !ok {
|
return "", errors.New("http response interface can not change map[string]interface{}")
|
}
|
middle, ok := out["updated"].(float64)
|
if !ok {
|
return "", errors.New("first updated change error!")
|
}
|
mes := ""
|
if middle == 1 {
|
mes = "追加成功"
|
}
|
if middle == 0 {
|
mes = "已经追加"
|
}
|
return mes, nil
|
|
}
|
|
/**************************************customer analysis util start**************************************/
|
/*******************sort []map util*******************/
|
type MapsSort struct {
|
Key string
|
MapList []map[string]interface{}
|
}
|
|
func (m *MapsSort) Len() int {
|
return len(m.MapList)
|
}
|
|
func (m *MapsSort) Less(i, j int) bool {
|
return m.MapList[i][m.Key].(string) > m.MapList[j][m.Key].(string)
|
}
|
|
func (m *MapsSort) Swap(i, j int) {
|
m.MapList[i], m.MapList[j] = m.MapList[j], m.MapList[i]
|
}
|
|
/*******************sort []map util*******************/
|
//根据时间范围聚合所有区域人信息,返回固定条数
|
func GetFaceDataByTimeAndTotal(startTime string, endTime string, total int, thresholdTime int, thresholdStayTime int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) {
|
var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
|
var requestBody = `{
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"range": {
|
"picDate": {
|
"gte": "` + startTime + `",
|
"lte": "` + endTime + `"
|
}
|
}
|
},
|
{
|
"term":{
|
"targetInfo.targetType.raw": "FaceDetect"
|
}
|
}
|
]
|
}
|
},
|
"size": 0,
|
"aggs": {
|
"buckets_aggs": {
|
"composite": {
|
"sources": [
|
{
|
"faceId": {
|
"terms": {
|
"field": "baseInfo.targetId"
|
}
|
}
|
},
|
{
|
"areaId": {
|
"terms": {
|
"field": "targetInfo.areaId"
|
}
|
}
|
}
|
],
|
"size": 10000000
|
},
|
"aggs": {
|
"top_attention_hits": {
|
"top_hits": {
|
"size": 1000000,
|
"sort": [
|
{
|
"picDate": {
|
"order": "asc"
|
}
|
}
|
],
|
"_source": {
|
"includes": [
|
"baseInfo.targetId",
|
"targetInfo.picSmUrl",
|
"targetInfo.areaId",
|
"picDate"
|
]
|
}
|
}
|
}
|
}
|
}
|
}
|
}`
|
buf, err := EsReq("POST", requestUrl, []byte(requestBody))
|
if err != nil {
|
return nil, err
|
}
|
source, err := FaceSourceAggregations(buf, thresholdTime, thresholdStayTime)
|
if err != nil {
|
return nil, err
|
}
|
faceSource := make([]map[string]interface{}, 0)
|
for index, info := range source {
|
if int(info["stayTime"].(float64)) > thresholdStayTime {
|
faceSource = append(faceSource, source[index])
|
}
|
}
|
if len(faceSource) > total {
|
mapsSort := MapsSort{}
|
mapsSort.Key = "endTime"
|
mapsSort.MapList = faceSource
|
sort.Sort(&mapsSort)
|
return mapsSort.MapList[:total], nil
|
}
|
return faceSource, nil
|
}
|
|
func GetFaceDataByTimeAndId(startTime string, endTime string, id string, thresholdTime int, thresholdStayTime int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) {
|
var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
|
var requestBody = `{
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"range": {
|
"picDate": {
|
"gte": "` + startTime + `",
|
"lte": "` + endTime + `"
|
}
|
}
|
},
|
{
|
"term":{
|
"targetInfo.targetType.raw": "FaceDetect"
|
}
|
},
|
{
|
"term":{
|
"baseInfo.targetId": "` + id + `"
|
}
|
}
|
]
|
}
|
},
|
"size": 0,
|
"aggs": {
|
"buckets_aggs": {
|
"composite": {
|
"sources": [
|
{
|
"faceId": {
|
"terms": {
|
"field": "baseInfo.targetId"
|
}
|
}
|
},
|
{
|
"areaId": {
|
"terms": {
|
"field": "targetInfo.areaId"
|
}
|
}
|
}
|
],
|
"size": 10000000
|
},
|
"aggs": {
|
"top_attention_hits": {
|
"top_hits": {
|
"size": 1000000,
|
"sort": [
|
{
|
"picDate": {
|
"order": "asc"
|
}
|
}
|
],
|
"_source": {
|
"includes": [
|
"baseInfo.targetId",
|
"targetInfo.picSmUrl",
|
"targetInfo.areaId",
|
"picDate"
|
]
|
}
|
}
|
}
|
}
|
}
|
}
|
}`
|
buf, err := EsReq("POST", requestUrl, []byte(requestBody))
|
if err != nil {
|
return nil, err
|
}
|
source, err := FaceSourceAggregations(buf, thresholdTime, thresholdStayTime)
|
if err != nil {
|
return nil, err
|
}
|
faceSource := make([]map[string]interface{}, 0)
|
for index, info := range source {
|
if int(info["stayTime"].(float64)) > thresholdStayTime {
|
faceSource = append(faceSource, source[index])
|
}
|
}
|
return faceSource, nil
|
}
|
|
func GetFaceIdDeduplication(startTime string, endTime string, serverIp string, serverPort string, indexName string) (ids []string, err error) {
|
var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
|
var requestBody = `{
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"range": {
|
"picDate": {
|
"gte": "` + startTime + `",
|
"lte": "` + endTime + `"
|
}
|
}
|
},
|
{
|
"term": {
|
"targetInfo.targetType.raw": "FaceDetect"
|
}
|
}
|
]
|
}
|
},
|
"size": 0,
|
"aggs": {
|
"buckets_aggs": {
|
"composite": {
|
"sources": [
|
{
|
"faceId": {
|
"terms": {
|
"field": "baseInfo.targetId"
|
}
|
}
|
}
|
],
|
"size": 10000000
|
}
|
}
|
}
|
}`
|
//fmt.Println(requestUrl)
|
//fmt.Println(requestBody)
|
buf, err := EsReq("POST", requestUrl, []byte(requestBody))
|
if err != nil {
|
return nil, err
|
}
|
ids, err1 := SourceDeduplication(buf)
|
if err1 != nil {
|
return nil, err1
|
}
|
return ids, nil
|
}
|
|
//统计各个区域人数
|
func StatisticsEveryAreaPersonsNumber(startTime string, endTime string, serverIp string, serverPort string, indexName string) ([]map[string]interface{}, error) {
|
var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
|
var requestBody = `{
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"range": {
|
"picDate": {
|
"gte": "` + startTime + `",
|
"lte": "` + endTime + `"
|
}
|
}
|
},
|
{
|
"term": {
|
"targetInfo.targetType.raw": "Yolo"
|
}
|
}
|
]
|
}
|
},
|
"size": 0,
|
"aggs": {
|
"buckets_aggs": {
|
"composite": {
|
"sources": [
|
{
|
"areaId": {
|
"terms": {
|
"field": "targetInfo.areaId"
|
}
|
}
|
}
|
],
|
"size": 10000000
|
}
|
}
|
}
|
}`
|
buf, err := EsReq("POST", requestUrl, []byte(requestBody))
|
if err != nil {
|
return nil, err
|
}
|
result, err := SourceStatistics(buf)
|
if err != nil {
|
return nil, err
|
}
|
return result, nil
|
}
|
|
/**************************************customer analysis util end**************************************/
|
//根据摄像机列表和时间查询人员浏览轨迹
|
func GetPersonDataByCameraIdAndTime(cameraId []string, startTime string, endTime string, serverIp string, serverPort string, indexName string) (map[string]interface{}, error) {
|
|
var filterArr []string
|
if cameraId != nil && len(cameraId) > 0 {
|
esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1)
|
filterArr = append(filterArr, `{
|
"terms": {
|
"cameraId": ["`+esCameraId+`"]
|
}
|
}`)
|
}
|
filterArr = append(filterArr, `{
|
"range": {
|
"picDate": {
|
"gte": "`+startTime+`",
|
"lte": "`+endTime+`"
|
}
|
}
|
}`)
|
filterArr = append(filterArr, ` {
|
"term": {
|
"targetInfo.targetType.raw": "Yolo"
|
}
|
}`)
|
queryStr := strings.Join(filterArr, ",")
|
|
personUrl := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
|
personBody := `{
|
"query": {
|
"bool": {
|
"filter": [
|
` + queryStr + `
|
]
|
}
|
},
|
"size": 2147483647,
|
"_source": {
|
"includes": [
|
"cameraId",
|
"cameraName",
|
"cameraAddr",
|
"targetInfo.targetScore",
|
"picDate",
|
"updateTime",
|
"picMaxUrl",
|
"targetInfo.belongsTargetId",
|
"targetInfo.targetLocation",
|
"picWH"
|
]
|
}
|
}`
|
//fmt.Println(personUrl)
|
//fmt.Println(personBody)
|
source := make(map[string]interface{})
|
queryStartTime := time.Now()
|
buf, err := EsReq("POST", personUrl, []byte(personBody))
|
if err != nil {
|
return nil, err
|
}
|
queryUseTime := time.Now().Sub(queryStartTime).Seconds() * 1000
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return nil, err
|
}
|
resData, err := PerSonAnalysis(sources)
|
source["result"] = resData
|
source["total"] = len(resData)
|
source["queryUseTime"] = queryUseTime
|
//println(sources)
|
return source, nil
|
|
}
|
|
//根据时间范围,摄像机列表,分组聚合人脸列表,返回分组数据
|
func GetFaceDataBucketsByCameraIdAndTimeReturnByGrouped(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) {
|
var filterArr []string
|
if cameraId != nil && len(cameraId) > 0 {
|
esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1)
|
filterArr = append(filterArr, `{
|
"terms": {
|
"cameraId": ["`+esCameraId+`"]
|
}
|
}`)
|
}
|
if personId != nil && len(personId) > 0 {
|
esPersonId := strings.Replace(strings.Trim(fmt.Sprint(personId), "[]"), " ", "\",\"", -1)
|
filterArr = append(filterArr, `{
|
"terms": {
|
"baseInfo.targetId": ["`+esPersonId+`"]
|
}
|
}`)
|
}
|
filterArr = append(filterArr, `{
|
"range": {
|
"picDate": {
|
"gte": "`+startTime+`",
|
"lte": "`+endTime+`"
|
}
|
}
|
}`)
|
filterArr = append(filterArr, ` {
|
"term": {
|
"targetInfo.targetType.raw": "FaceDetect"
|
}
|
}`)
|
queryStr := strings.Join(filterArr, ",")
|
|
var buckersUrl = "http://" + serverIp + ":" + ServerPort + "/" + indexName + "/_search"
|
var buckersBody = `{
|
"query": {
|
"bool": {
|
"filter": [
|
` + queryStr + `
|
]
|
}
|
},
|
"size": 0,
|
"aggs": {
|
"buckets_aggs": {
|
"composite": {
|
"sources": [
|
{
|
"baseInfo.targetId": {
|
"terms": {
|
"field": "baseInfo.targetId"
|
}
|
}
|
},
|
{
|
"cameraId": {
|
"terms": {
|
"field": "cameraId"
|
}
|
}
|
}
|
],
|
"size": 10000000
|
},
|
"aggs":{
|
"top_attention_hits":{
|
"top_hits":{
|
"size": 1000000,
|
"sort": [
|
{
|
"picDate": {
|
"order": "asc"
|
}
|
}
|
],
|
"_source":{
|
"includes":["baseInfo.targetId","cameraId","cameraName","cameraAddr","targetInfo.targetScore","targetInfo.picSmUrl","showLabels","baseInfo.tableId","baseInfo.tableName","baseInfo.bwType","baseInfo.targetName","baseInfo.compareScore","picDate","picMaxUrl","picWH"]
|
}
|
}
|
}
|
}
|
}
|
}
|
}`
|
//fmt.Println(buckersUrl)
|
//fmt.Println(buckersBody)
|
sources := make(map[string]interface{})
|
queryStartTime := time.Now()
|
buf, err := EsReq("POST", buckersUrl, []byte(buckersBody))
|
if err != nil {
|
return nil, err
|
}
|
queryUseTime := time.Now().Sub(queryStartTime).Seconds() * 1000
|
//fmt.Println(queryUseTime)
|
tmpSources, err := SourceAggregationsReturnByGrouped(buf, thresholdTime)
|
if err != nil {
|
return nil, err
|
}
|
sources["result"] = tmpSources
|
sources["total"] = len(tmpSources)
|
sources["queryUseTime"] = queryUseTime
|
//println(sources)
|
return sources, nil
|
}
|
|
//根据时间范围,摄像机列表,分组聚合人脸列表
|
func GetFaceDataBucketsByCameraIdAndTime(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) {
|
var filterArr []string
|
if cameraId != nil && len(cameraId) > 0 {
|
esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1)
|
filterArr = append(filterArr, `{
|
"terms": {
|
"cameraId": ["`+esCameraId+`"]
|
}
|
}`)
|
}
|
if personId != nil && len(personId) > 0 {
|
esPersonId := strings.Replace(strings.Trim(fmt.Sprint(personId), "[]"), " ", "\",\"", -1)
|
filterArr = append(filterArr, `{
|
"terms": {
|
"baseInfo.targetId": ["`+esPersonId+`"]
|
}
|
}`)
|
}
|
filterArr = append(filterArr, `{
|
"range": {
|
"picDate": {
|
"gte": "`+startTime+`",
|
"lte": "`+endTime+`"
|
}
|
}
|
}`)
|
filterArr = append(filterArr, ` {
|
"term": {
|
"targetInfo.targetType.raw": "FaceDetect"
|
}
|
}`)
|
queryStr := strings.Join(filterArr, ",")
|
|
var buckersUrl = "http://" + serverIp + ":" + ServerPort + "/" + indexName + "/_search"
|
var buckersBody = `{
|
"query": {
|
"bool": {
|
"filter": [
|
` + queryStr + `
|
]
|
}
|
},
|
"size": 0,
|
"aggs": {
|
"buckets_aggs": {
|
"composite": {
|
"sources": [
|
{
|
"baseInfo.targetId": {
|
"terms": {
|
"field": "baseInfo.targetId"
|
}
|
}
|
},
|
{
|
"cameraId": {
|
"terms": {
|
"field": "cameraId"
|
}
|
}
|
}
|
],
|
"size": 10000000
|
},
|
"aggs":{
|
"top_attention_hits":{
|
"top_hits":{
|
"size": 1000000,
|
"sort": [
|
{
|
"picDate": {
|
"order": "asc"
|
}
|
}
|
],
|
"_source":{
|
"includes":["baseInfo.targetId","cameraId","cameraName","cameraAddr","targetInfo.targetScore","targetInfo.picSmUrl","showLabels","baseInfo.tableId","baseInfo.tableName","baseInfo.bwType","baseInfo.targetName","baseInfo.compareScore","picDate","picMaxUrl","picWH"]
|
}
|
}
|
}
|
}
|
}
|
}
|
}`
|
//fmt.Println(buckersUrl)
|
//fmt.Println(buckersBody)
|
queryStartTime := time.Now()
|
buf, err := EsReq("POST", buckersUrl, []byte(buckersBody))
|
if err != nil {
|
return nil, err
|
}
|
queryUseTime := time.Now().Sub(queryStartTime).Seconds() * 1000
|
|
sources, err := SourceAggregations(buf, thresholdTime, queryUseTime)
|
if err != nil {
|
return nil, err
|
}
|
return sources, nil
|
}
|
|
//根据抓拍人员id更新(picurl)图片地址
|
func UpdatePicUrlById(id string, picUrl string, indexName string, serverIp string, serverPort string) (err error) {
|
updateTime := time.Now().Format("2006-01-02 15:04:05")
|
tRes, err := AIOceaninfosbyid([]string{id}, indexName, serverIp, serverPort)
|
if err != nil || len(tRes) == 0 {
|
return err
|
}
|
picMaxUrls := tRes[0].PicMaxUrl
|
sourceStr := `
|
"source": "ctx._source.picMaxUrl.add('` + picUrl + `');ctx._source.updateTime='` + updateTime + `'"
|
`
|
if len(picMaxUrls) >= 2 {
|
sourceStr = `"source": "ctx._source.picMaxUrl[1]='` + picUrl + `';ctx._source.updateTime='` + updateTime + `'"`
|
}
|
var info interface{}
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query?refresh=true"
|
|
var picUrlInfo = `
|
{
|
"script": {
|
` + sourceStr + `
|
},
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"term": {
|
"id": "` + id + `"
|
}
|
}
|
]
|
}
|
}
|
}
|
`
|
//logPrint("url: ", url, videoUrlInfo)
|
//fmt.Println(url, picUrlInfo)
|
buf, err := EsReq("POST", url, []byte(picUrlInfo))
|
if err != nil {
|
logPrint("http request videoUrlInfo info is err!")
|
return err
|
}
|
json.Unmarshal(buf, &info)
|
//logPrint(info)
|
out, ok := info.(map[string]interface{})
|
if !ok {
|
logPrint("http response interface can not change map[string]interface{}")
|
return errors.New("http response interface can not change map[string]interface{}")
|
}
|
middle, ok := out["updated"].(float64)
|
if !ok {
|
logPrint("first updated change error!", out)
|
return errors.New("first updated change error!")
|
}
|
if middle == 1 {
|
return nil
|
}
|
if middle == 0 {
|
return errors.New("已经修改")
|
}
|
return nil
|
}
|
|
//根据抓拍人员id更新(videourl)摄像机地址
|
func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int, err error) {
|
|
var info interface{}
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query?refresh=true"
|
sourceStr := "ctx._source.videoUrl='" + videoUrl + "'"
|
if command >= 0 {
|
sourceStr = "ctx._source.linkTagInfo[" + strconv.Itoa(command) + "].videoUrl='" + videoUrl + "'"
|
}
|
var videoUrlInfo = `
|
{
|
"script": {
|
"source": "` + sourceStr + `"
|
},
|
"query": {
|
"term": {
|
"id": "` + id + `"
|
}
|
}
|
}
|
`
|
//logPrint("url: ", url, videoUrlInfo)
|
buf, err := EsReq("POST", url, []byte(videoUrlInfo))
|
if err != nil {
|
logPrint("http request videoUrlInfo info is err!")
|
statu = 500
|
return statu, err
|
}
|
json.Unmarshal(buf, &info)
|
//logPrint(info)
|
out, ok := info.(map[string]interface{})
|
if !ok {
|
logPrint("http response interface can not change map[string]interface{}")
|
statu = 500
|
return statu, errors.New("http response interface can not change map[string]interface{}")
|
}
|
middle, ok := out["updated"].(float64)
|
if !ok {
|
logPrint("first updated change error!")
|
statu = 500
|
return statu, errors.New("first updated change error!")
|
}
|
if middle == 1 {
|
statu = 200
|
return statu, nil
|
}
|
if middle == 0 {
|
statu = 201
|
return statu, errors.New("已经修改")
|
}
|
return statu, nil
|
}
|
|
//获取当前节点抓拍库所有人员ID*缓存*
|
func GetAllLocalVideopersonsId(compareArgs protomsg.CompareArgs, indexName string, serverIp string, serverPort string, alarmLevelTypes string) (capturetable []string) {
|
queryStr := ""
|
queryBody := compareArgs.InputValue
|
//检索框
|
if queryBody != "" {
|
queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"cameraAddr^1.5\",\"taskName^1.5\",\"sdkName^1.5\",\"showLabels^3.0\",\"baseInfo.tableName^1.5\",\"baseInfo.targetName^1.5\",\"baseInfo.labels^1.5\",\"alarmRules.alarmLevel^1.5\",\"linkTag^1.5\"]," +
|
"\"type\":\"cross_fields\",\"operator\":\"OR\",\"slop\":0,\"prefix_length\":0,\"max_expansions\":50,\"zero_terms_query\":\"NONE\",\"auto_generate_synonyms_phrase_query\":true,\"fuzzy_transpositions\":true,\"boost\":1}}],"
|
}
|
if compareArgs.SearchTime == nil || len(compareArgs.SearchTime) != 2 {
|
return nil
|
}
|
gteDate := compareArgs.SearchTime[0]
|
lteDate := compareArgs.SearchTime[1]
|
//判断任务ID
|
taskIdStr := ""
|
taskId := compareArgs.Tasks
|
if taskId != nil && len(taskId) > 0 {
|
esTaskId := strings.Replace(strings.Trim(fmt.Sprint(taskId), "[]"), " ", "\",\"", -1)
|
taskIdStr = "{\"terms\":{\"taskId\":[\"" + esTaskId + "\"]}},"
|
}
|
//判断摄像机ID
|
cameraIdStr := ""
|
cameraId := compareArgs.TreeNodes
|
if cameraId != nil && len(cameraId) > 0 {
|
esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1)
|
cameraIdStr = "{\"terms\":{\"cameraId\":[\"" + esCameraId + "\"]}},"
|
}
|
|
//判断库表ID
|
tableId := compareArgs.Tabs
|
esTableId := ""
|
esTableIdStr := ""
|
if tableId != nil && len(tableId) > 0 {
|
esTableId = strings.Replace(strings.Trim(fmt.Sprint(tableId), "[]"), " ", "\",\"", -1)
|
esTableIdStr = "{\"terms\":{\"baseInfo.tableId\":[\"" + esTableId + "\"]}},"
|
}
|
isCollectStr := ""
|
isCollect := compareArgs.Collection
|
if isCollect != "" {
|
isCollectStr = "{\"term\":{\"isCollect\":\"" + isCollect + "\"}},"
|
}
|
|
//判断布防等级
|
alarmLevelStr := ""
|
if alarmLevelTypes != "" {
|
alarmLevelStr = "{\"terms\":{\"alarmRules.alarmLevel.raw\":[\"" + alarmLevelTypes + "\"]}},"
|
}
|
|
//使用es底层机制处理分页
|
|
analyServerFilterStr := ""
|
analyServerId := compareArgs.AnalyServerId
|
if analyServerId != "" {
|
analyServerFilterStr = "{\"term\":{\"analyServerId\":\"" + analyServerId + "\"}},"
|
}
|
|
ts := time.Now()
|
//首次请求头
|
url := "http://" + serverIp + ":" + serverPort +
|
"/" + indexName + "/_search?search_type=dfs_query_then_fetch;scroll=1m"
|
|
var lock sync.RWMutex
|
var wg sync.WaitGroup
|
|
for i := 0; i < 48; i++ {
|
//请求体
|
prama := "{" +
|
"\"slice\":{\"id\":" + strconv.Itoa(i) + ",\"max\":48}," +
|
"\"size\":\"1000\"," +
|
"\"query\":{\"bool\":{" + queryStr +
|
"\"filter\":[" +
|
"{\"term\":{\"targetInfo.targetType.raw\":\"FaceDetect\"}}," +
|
cameraIdStr +
|
alarmLevelStr +
|
taskIdStr +
|
isCollectStr +
|
esTableIdStr +
|
analyServerFilterStr +
|
"{\"range\":{\"picDate\":{\"from\":\"" + gteDate + "\",\"to\":\"" + lteDate + "\",\"include_lower\":true,\"include_upper\":true,\"boost\":1}}}]}}," +
|
"\"_source\":[\"id\"]" +
|
"}"
|
wg.Add(1)
|
go func(reqParam string) {
|
defer wg.Done()
|
|
logPrint(url)
|
logPrint(prama)
|
buf, err := EsReq("POST", url, []byte(reqParam))
|
|
if err != nil {
|
logPrint("http request videoUrlInfo info is err!")
|
logPrint(len(capturetable))
|
return
|
}
|
|
sources, err := Sourcelistforscroll(buf)
|
|
if err != nil {
|
logPrint(len(capturetable))
|
return
|
}
|
for _, source := range sources["sourcelist"].([]map[string]interface{}) {
|
lock.Lock()
|
capturetable = append(capturetable, source["id"].(string))
|
lock.Unlock()
|
}
|
|
scroll_id := sources["scroll_id"].(string)
|
|
//scroll请求头
|
scroll_url := "http://" + serverIp + ":" + serverPort + "/_search/scroll"
|
|
for {
|
var tmpList []string
|
next_scroll_id := ""
|
if next_scroll_id != "" {
|
scroll_id = next_scroll_id
|
}
|
jsonDSL := `{
|
"scroll": "1m",
|
"scroll_id" : "` + scroll_id + `"
|
}`
|
logPrint(scroll_url)
|
logPrint(jsonDSL)
|
buf, err := EsReq("POST", scroll_url, []byte(jsonDSL))
|
|
if err != nil {
|
logPrint("lenth1: ", len(capturetable))
|
return
|
}
|
nextSources, err := Sourcelistforscroll(buf)
|
|
if nextSources == nil {
|
return
|
}
|
|
nextM := nextSources["sourcelist"].([]map[string]interface{})
|
//logPrint("id",nextSources)
|
if nextM == nil || len(nextM) == 0 {
|
//logPrint("lenth: ", len(capturetable))
|
return
|
}
|
//logPrint("id")
|
for _, source := range nextM {
|
tmpList = append(tmpList, source["id"].(string))
|
}
|
//logPrint("tmpList: ", len(tmpList))
|
lock.Lock()
|
capturetable = append(capturetable, tmpList...)
|
lock.Unlock()
|
|
next_scroll_id = nextSources["scroll_id"].(string)
|
}
|
|
}(prama)
|
}
|
wg.Wait()
|
|
logPrint("lenth_all: ", len(capturetable))
|
logPrint("耗时:", time.Since(ts))
|
return capturetable
|
}
|
|
//初始化实时抓拍
|
func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm string, category string, quantity int) ([]protomsg.AIOcean, error) {
|
var aIOceanInfo []protomsg.AIOcean
|
url := "http://" + serverIp + ":" + serverPort +
|
"/" + indexName + "/_search"
|
var filterArr []string
|
if isAlarm != "all" {
|
filterArr = append(filterArr, ` {
|
"term":{
|
"isAlarm":"`+isAlarm+`"
|
}
|
}`)
|
}
|
|
if category != "all" {
|
filterArr = append(filterArr, ` {
|
"term":{
|
"targetInfo.targetType":"`+category+`"
|
}
|
}`)
|
|
}
|
|
queryStr := `"query":{
|
"bool":{
|
"filter":[
|
` + strings.Join(filterArr, ",") + `
|
]
|
}
|
},`
|
|
DSLJson := `{
|
"size":` + strconv.Itoa(quantity) + `,
|
` + queryStr + `
|
"sort":[{"picDate":{"order":"desc"}}],
|
"_source": {"includes":[],"excludes":["*.feature"]}
|
}`
|
logPrint(DSLJson)
|
buf, err := EsReq("POST", url, []byte(DSLJson))
|
if err != nil {
|
return aIOceanInfo, err
|
}
|
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return aIOceanInfo, err
|
}
|
|
aIOcean := AIOceanAnalysis(sources)
|
//logPrint(len(videoperson))
|
return aIOcean, nil
|
}
|
|
//实时抓拍
|
func RealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool) ([]protomsg.AIOcean, error) {
|
var aIOceanInfo []protomsg.AIOcean
|
url := "http://" + serverIp + ":" + serverPort +
|
"/" + indexName + "/_search"
|
DSLJson := `{
|
"size":20,
|
"query":{
|
"bool":{
|
"filter":[
|
{
|
"range":{
|
"picDate":{
|
"gte":"now+8h-30s",
|
"lt":"now+8h"
|
}
|
}
|
},
|
{
|
"term":{
|
"isAlarm":` + strconv.FormatBool(isAlarm) + `
|
}
|
}
|
]
|
}
|
},
|
"_source": {"includes":[],"excludes":["*.feature"]}
|
}`
|
|
buf, err := EsReq("POST", url, []byte(DSLJson))
|
if err != nil {
|
return aIOceanInfo, err
|
}
|
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return aIOceanInfo, err
|
}
|
|
aIOcean := AIOceanAnalysis(sources)
|
logPrint(len(aIOcean))
|
return aIOcean, nil
|
}
|
|
//综合统计
|
func StatisticsComprehensive(serverIp string, serverPort string, indexName string, isAlarm string) (total int, err error) {
|
url := "http://" + serverIp + ":" + serverPort +
|
"/" + indexName + "/_search"
|
isAlarmStr := ""
|
if isAlarm != "all" {
|
isAlarmStr = ` {
|
"term":{
|
"isAlarm":"` + isAlarm + `"
|
}
|
},`
|
|
}
|
DSLJson := `{
|
"size":0,
|
"query":{
|
"bool":{
|
"filter":[
|
` + isAlarmStr + `
|
{
|
"range":{
|
"picDate":{
|
"gte":"now+8h/d"
|
}
|
}
|
}
|
]
|
}
|
}
|
}`
|
//logPrint(DSLJson)
|
buf, err := EsReq("POST", url, []byte(DSLJson))
|
if err != nil {
|
return total, err
|
}
|
var info interface{}
|
json.Unmarshal(buf, &info)
|
out, ok := info.(map[string]interface{})
|
if !ok {
|
return total, errors.New("http response interface can not change map[string]interface{}")
|
}
|
middle, ok := out["hits"].(map[string]interface{})
|
if !ok {
|
return total, errors.New("first hits change error!")
|
}
|
total = int(middle["total"].(float64))
|
//logPrint(total)
|
return total, nil
|
}
|
|
//实时报警任务比率
|
func RealTimeAlarmTaskRate(serverIp string, serverPort string, indexName string) (sources []map[string]interface{}, err error) {
|
url := "http://" + serverIp + ":" + serverPort +
|
"/" + indexName + "/_search"
|
DSLJson := `{
|
"size":0,
|
"query":{
|
"bool":{
|
"filter":[{
|
"range":{
|
"picDate":{
|
"gte":"now+8h/d"
|
}
|
}
|
}]
|
}
|
},
|
"aggs":{
|
"sdkName_status":{
|
"terms":{
|
"field":"sdkName.raw"
|
}
|
}
|
}
|
}`
|
buf, err := EsReq("POST", url, []byte(DSLJson))
|
if err != nil {
|
return nil, err
|
}
|
var info interface{}
|
json.Unmarshal(buf, &info)
|
out, ok := info.(map[string]interface{})
|
if !ok {
|
return nil, errors.New("http response interface can not change map[string]interface{}")
|
}
|
middle, ok := out["aggregations"].(map[string]interface{})
|
if !ok {
|
return nil, errors.New("first hits change error!")
|
}
|
sdkName_status, ok := middle["sdkName_status"].(map[string]interface{})
|
if !ok {
|
return nil, errors.New("first hits change error!")
|
}
|
|
for _, in := range sdkName_status["buckets"].([]interface{}) {
|
var source = make(map[string]interface{}, 0)
|
tmpbuf, ok := in.(map[string]interface{})
|
if !ok {
|
logPrint("change to source error!")
|
continue
|
}
|
sdkName := tmpbuf["key"].(string)
|
count := int(tmpbuf["doc_count"].(float64))
|
source["name"] = sdkName
|
source["value"] = count
|
sources = append(sources, source)
|
}
|
//logPrint("tmpSource",sources)
|
return sources, nil
|
}
|
|
//聚合任务列表,taskId+taskName
|
func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string) (sources []map[string]interface{}, err error) {
|
url := "http://" + serverIp + ":" + serverPort +
|
"/" + indexName + "/_search"
|
serverFilterStr := ""
|
if analyServerId != "" {
|
serverFilterStr = `,
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"term": {
|
"analyServerId": "` + analyServerId + `"
|
}
|
}
|
]
|
}
|
}`
|
}
|
DSLJson := `{
|
"size": 0,
|
"aggs": {
|
"task_status": {
|
"composite": {
|
"sources": [
|
{
|
"taskId": {
|
"terms": {
|
"field": "taskId"
|
}
|
}
|
},
|
{
|
"taskName": {
|
"terms": {
|
"field": "taskName.raw"
|
}
|
}
|
}
|
],
|
"size":"1000000"
|
}
|
}
|
}
|
` + serverFilterStr + `
|
}`
|
buf, err := EsReq("POST", url, []byte(DSLJson))
|
if err != nil {
|
return nil, err
|
}
|
var info interface{}
|
json.Unmarshal(buf, &info)
|
out, ok := info.(map[string]interface{})
|
if !ok {
|
return nil, errors.New("http response interface can not change map[string]interface{}")
|
}
|
middle, ok := out["aggregations"].(map[string]interface{})
|
if !ok {
|
return nil, errors.New("first hits change error!")
|
}
|
task_status, ok := middle["task_status"].(map[string]interface{})
|
if !ok {
|
return nil, errors.New("first hits change error!")
|
}
|
|
for _, in := range task_status["buckets"].([]interface{}) {
|
var source = make(map[string]interface{}, 0)
|
tmpbuf, ok := in.(map[string]interface{})
|
if !ok {
|
logPrint("change to source error!")
|
continue
|
}
|
task := tmpbuf["key"].(map[string]interface{})
|
count := int(tmpbuf["doc_count"].(float64))
|
taskName := task["taskName"].(string)
|
taskId := task["taskId"].(string)
|
source["taskName"] = taskName
|
source["taskId"] = taskId
|
source["count"] = count
|
sources = append(sources, source)
|
}
|
//logPrint("tmpSource",sources)
|
return sources, nil
|
|
}
|
|
//添加即将删除信号
|
func AddDeleteSignal() {
|
|
}
|
|
/****************************************以下为sdkCompare比对缓存使用方法*********************************************/
|
//获取查询总数 *缓存*
|
func GetTotal(serverIp string, serverPort string, indexName string, shards string, targetType string) (total int) {
|
JsonDSL := `{
|
"size": 0,
|
"query": {
|
"bool": {
|
"filter": [{
|
"term": {
|
"targetInfo.targetType.raw": "` + targetType + `"
|
}
|
}]
|
}
|
}
|
}`
|
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
|
buf, err := EsReq("POST", url, []byte(JsonDSL))
|
if err != nil {
|
return
|
}
|
var info interface{}
|
json.Unmarshal(buf, &info)
|
out, ok := info.(map[string]interface{})
|
if !ok {
|
return
|
}
|
middle, ok := out["hits"].(map[string]interface{})
|
if !ok {
|
return
|
}
|
total = int(middle["total"].(float64))
|
return total
|
|
}
|
|
//查询时间段数据 *缓存*
|
func GetPeriodInfos(serverIp string, serverPort string, startTime string, endTime string, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) {
|
var capdbinfo []*protomsg.MultiFeaCache
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
|
var source []string
|
switch targetType {
|
case "face", "FaceDetect":
|
source = []string{"id", "targetInfo.feature", "analyServerId", "cameraId"}
|
case "track":
|
source = []string{"id", "targetInfo.feature", "analyServerId", "cameraId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.cameraId", "linkTagInfo.targetInfo.targetLocation"}
|
}
|
JsonDSL := `
|
{
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"term": {
|
"targetInfo.targetType.raw": "` + targetType + `"
|
}
|
},
|
{
|
"range": {
|
"picDate": {
|
"gte": "` + startTime + `",
|
"lt": "` + endTime + `"
|
}
|
}
|
}
|
]
|
}
|
},
|
"size": 1000000,
|
"_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"]
|
}
|
`
|
//logger.Debug(url)
|
//logger.Debug(JsonDSL)
|
//logPrint(JsonDSL)
|
buf, err := EsReq("POST", url, []byte(JsonDSL))
|
if err != nil {
|
return capdbinfo, errors.New("http request dbtablename info is err!")
|
}
|
|
// 返回 _source 数组
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return capdbinfo, err
|
}
|
//logPrint(sources)
|
// 返回所有查询的数据
|
capdbinfos := Parsesources(sources)
|
return capdbinfos, nil
|
}
|
|
// 查询底库人员信息*缓存*
|
func GetOceanFeatures(serverIp string, serverPort string, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) {
|
//queryIndexNum int
|
//var dbinfos []*protomsg.MultiFeaCache
|
dbinfos := make([]*protomsg.MultiFeaCache, 0)
|
//dbinfosss := make([]*protomsg.MultiFeaCache,0)
|
//dbinfoss = append(dbinfoss, dbinfosss...)
|
|
JsonDSL := ""
|
var source []string
|
switch targetType {
|
case "face", "FaceDetect":
|
source = []string{"id", "targetInfo.feature", "analyServerId"}
|
case "track":
|
source = []string{"id", "targetInfo.feature", "analyServerId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.targetInfo.targetLocation"}
|
}
|
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local;scroll=1m"
|
|
var lock sync.RWMutex
|
var wg sync.WaitGroup
|
|
for i := 0; i < 48; i++ {
|
//请求体
|
JsonDSL = ` {
|
"slice": {
|
"id": "` + strconv.Itoa(i) + `",
|
"max": 48
|
},
|
"size":` + strconv.Itoa(queryNums) + `,
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"term": {
|
"targetInfo.targetType.raw": "` + targetType + `"
|
}
|
}
|
]
|
}
|
},
|
"_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"]
|
}`
|
wg.Add(1)
|
go func(reqJsonDSL string) {
|
defer wg.Done()
|
|
//fmt.Println(url)
|
//fmt.Println(prama)
|
//logPrint("url: ",url)
|
//logPrint("url: ",reqJsonDSL)
|
buf, err := EsReq("POST", url, []byte(reqJsonDSL))
|
if err != nil {
|
logPrint("EsReq: ", err)
|
return
|
}
|
|
// 返回 _source 数组
|
sources, err := Sourcelistforscroll(buf)
|
if err != nil {
|
logPrint("EsReq: ", err)
|
return
|
}
|
// 返回所有查询的数据
|
ftmpDatas := Parsesources(sources["sourcelist"].([]map[string]interface{}))
|
lock.Lock()
|
dbinfos = append(dbinfos, ftmpDatas...)
|
//logPrint("prsLen: ", len(Parsesources(sources["sourcelist"].([]map[string]interface{}))))
|
//logPrint("dbinfosLen: ", len(dbinfos))
|
lock.Unlock()
|
|
scroll_id := sources["scroll_id"].(string)
|
|
//scroll请求头
|
scroll_url := "http://" + serverIp + ":" + serverPort + "/_search/scroll"
|
for {
|
next_scroll_id := ""
|
if next_scroll_id != "" {
|
scroll_id = next_scroll_id
|
}
|
jsonDSL := `{
|
"scroll": "1m",
|
"scroll_id" : "` + scroll_id + `"
|
}`
|
//fmt.Println(scroll_url)
|
//fmt.Println(jsonDSL)
|
buf, err := EsReq("POST", scroll_url, []byte(jsonDSL))
|
|
if err != nil {
|
//fmt.Println("lenth1: ", len(dbinfos))
|
return
|
}
|
nextSources, err := Sourcelistforscroll(buf)
|
|
if nextSources == nil {
|
return
|
}
|
|
nextM := nextSources["sourcelist"].([]map[string]interface{})
|
//fmt.Println("id",nextSources)
|
if nextM == nil || len(nextM) == 0 {
|
//fmt.Println("lenth: ", len(capturetable))
|
return
|
}
|
tmpDatas := Parsesources(nextM)
|
lock.Lock()
|
dbinfos = append(dbinfos, tmpDatas...)
|
//logPrint("tmpDatasLen: ", len(tmpDatas))
|
//logPrint("AdbinfosLen: ", len(dbinfos))
|
lock.Unlock()
|
|
next_scroll_id = nextSources["scroll_id"].(string)
|
}
|
|
}(JsonDSL)
|
}
|
wg.Wait()
|
|
//fmt.Println("lenth_all: ", len(dbinfos))
|
|
return dbinfos, nil
|
}
|
|
//************************CORN TASK*******************************
|
//查询日期范围内是否还存在数据
|
func QueryAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
|
deleteJson := `{
|
"query":{
|
"bool":{
|
"filter":[{
|
"range":{
|
"updateTime":{
|
"gte":"` + startTime + `",
|
"lte":"` + endTime + `"
|
}
|
}
|
},
|
{
|
"term":{
|
"analyServerId":"` + analyServerId + `"
|
}
|
}
|
]
|
}
|
}
|
} `
|
buf, err := EsReq("POST", url, []byte(deleteJson))
|
if err != nil {
|
return false, errors.New("请求失败")
|
}
|
resTotal, err := SourceTotal(buf)
|
if err != nil {
|
return false, errors.New("解码失败")
|
}
|
if resTotal == -1 || resTotal == 0 {
|
result = false
|
} else {
|
result = true
|
}
|
return result, nil
|
}
|
|
//按日期范围,服务器Id删除数据
|
func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_delete_by_query"
|
deleteJson := `{
|
"query":{
|
"bool":{
|
"filter":[{
|
"range":{
|
"updateTime":{
|
"gte":"` + startTime + `",
|
"lte":"` + endTime + `"
|
}
|
}
|
},
|
{
|
"term":{
|
"analyServerId":"` + analyServerId + `"
|
}
|
}
|
]
|
}
|
}
|
} `
|
buf, err := EsReq("POST", url, []byte(deleteJson))
|
if err != nil {
|
return false, errors.New("请求失败")
|
}
|
deleteRes, err := SourceDeleted(buf)
|
if err != nil {
|
return false, errors.New("解码失败")
|
}
|
if deleteRes == -1 {
|
result = false
|
} else {
|
result = true
|
}
|
return result, nil
|
}
|
|
//给所有节点追加删除任务信息
|
func AddDelTask(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
|
addJson := `{
|
"script": {
|
"lang":"painless",
|
"inline": "ctx._source.instantTask.add(params.newtask)",
|
"params": {
|
"newtask": {
|
"instantClearId": "` + analyServerId + `",
|
"startTime": "` + startTime + `",
|
"endTime": "` + endTime + `"
|
}
|
}
|
},
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"term": {
|
"application": "loopCoverage"
|
}
|
}
|
]
|
}
|
}
|
}`
|
buf, err := EsReq("POST", url, []byte(addJson))
|
if err != nil {
|
return false, errors.New("请求失败")
|
}
|
updateRes, err := SourceUpdated(buf)
|
if err != nil {
|
return false, errors.New("解码失败")
|
}
|
if updateRes == -1 {
|
result = false
|
} else {
|
result = true
|
}
|
return result, nil
|
}
|
|
//移除已执行完的删除任务
|
func DeleteDelTask(serverIp string, serverPort string, indexName string, analyServerId string) (result bool, err error) {
|
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
|
deleteJson := `{
|
"script": {
|
"lang":"painless",
|
"inline": "ctx._source.instantTask.remove(0)"
|
},
|
"query": {
|
"bool": {
|
"filter":[{
|
"term":{
|
"id":"` + analyServerId + `"
|
}
|
}]
|
}
|
}
|
}`
|
buf, err := EsReq("POST", url, []byte(deleteJson))
|
if err != nil {
|
return false, errors.New("请求失败")
|
}
|
updateRes, err := SourceUpdated(buf)
|
if err != nil {
|
return false, errors.New("解码失败")
|
}
|
if updateRes == -1 {
|
result = false
|
} else {
|
result = true
|
}
|
return result, nil
|
}
|