| | |
| | | package esutil |
| | | |
| | | import ( |
| | | "basic.com/pubsub/protomsg.git" |
| | | "encoding/json" |
| | | "errors" |
| | | "fmt" |
| | | "sort" |
| | | "strconv" |
| | | "strings" |
| | | "sync" |
| | | "time" |
| | | |
| | | "basic.com/pubsub/protomsg.git" |
| | | ) |
| | | |
| | | var logPrint = func(i ...interface{}) { |
| | | fmt.Println(i) |
| | | } |
| | | |
| | | func InitLog(fn func(i ...interface{})) { |
| | | if fn != nil { |
| | | logPrint = fn |
| | | } |
| | | } |
| | | |
| | | // 根据抓拍人员id查询抓拍人员信息 |
| | | func AIOceaninfosbyid(id []string, indexName string, serverIp string, serverPort string) ([]protomsg.AIOcean, error) { |
| | |
| | | } |
| | | |
| | | aIOcean := AIOceanAnalysis(sources) |
| | | println(aIOcean) |
| | | return aIOcean, nil |
| | | } |
| | | |
| | | // 根据抓拍人员id查询视频地址 |
| | | func AIOceanVideoUrlbyid(id string, indexName string, serverIp string, serverPort string) (string, error) { |
| | | //var aIOceanInfo []protomsg.AIOcean |
| | | //videopersonsPersonId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1) |
| | | var dbinfoRequest = ` |
| | | { |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "term": { |
| | | "id": "` + id + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "_source": [ |
| | | "videoUrl" |
| | | ] |
| | | } |
| | | ` |
| | | buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search", []byte(dbinfoRequest)) |
| | | if err != nil { |
| | | return "", err |
| | | } |
| | | |
| | | sources, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return "", err |
| | | } |
| | | videoUrl := sources[0]["videoUrl"].(string) |
| | | //aIOcean := AIOceanAnalysis(sources) |
| | | return videoUrl, nil |
| | | } |
| | | |
| | | //根据抓拍库人员id查询特征值 |
| | |
| | | return feature, nil |
| | | } |
| | | |
| | | //根据目标id查询已追加条数 |
| | | func GetLinkTagInfoSize(id string, indexName string, serverIp string, serverPort string) (size int, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | queryDSL := `{ |
| | | "query": { |
| | | "term":{ |
| | | "id":"` + id + `" |
| | | } |
| | | } |
| | | }` |
| | | buf, err := EsReq("POST", url, []byte(queryDSL)) |
| | | if err != nil { |
| | | return -1, err |
| | | } |
| | | source, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return -1, err |
| | | } |
| | | if source[0]["linkTagInfo"] != nil { |
| | | size = len(source[0]["linkTagInfo"].([]interface{})) |
| | | } else { |
| | | return -1, errors.New("该数组不存在") |
| | | } |
| | | return size, nil |
| | | } |
| | | |
| | | //根据目标id追加跟踪信息 |
| | | func AppendTargetInfo(id string, targetInfo string, indexName string, serverIp string, serverPort string) (string, error) { |
| | | func AppendTargetInfo(id string, targetInfo string, indexName string, serverIp string, serverPort string, updateTime string) (string, error) { |
| | | if targetInfo == "" { |
| | | return "", errors.New("append data is nil") |
| | | } |
| | |
| | | }, |
| | | "script": { |
| | | "lang": "painless", |
| | | "inline": "ctx._source.linkTagInfo.add(params.newparam)", |
| | | "inline": "ctx._source.linkTagInfo.add(params.newparam);ctx._source.updateTime='` + updateTime + `'", |
| | | "params": { |
| | | "newparam": ` + targetInfo + ` |
| | | } |
| | | } |
| | | }` |
| | | fmt.Println(jsonDSL) |
| | | logPrint(jsonDSL) |
| | | buf, err := EsReq("POST", url, []byte(jsonDSL)) |
| | | if err != nil { |
| | | return "", err |
| | | } |
| | | json.Unmarshal(buf, &info) |
| | | out, ok := info.(map[string]interface{}) |
| | | fmt.Println(out) |
| | | logPrint(out) |
| | | if !ok { |
| | | return "", errors.New("http response interface can not change map[string]interface{}") |
| | | } |
| | |
| | | |
| | | } |
| | | |
| | | /**************************************customer analysis util start**************************************/ |
| | | /*******************sort []map util*******************/ |
| | | type MapsSort struct { |
| | | Key string |
| | | MapList []map[string]interface{} |
| | | } |
| | | |
| | | func (m *MapsSort) Len() int { |
| | | return len(m.MapList) |
| | | } |
| | | |
| | | func (m *MapsSort) Less(i, j int) bool { |
| | | return m.MapList[i][m.Key].(string) > m.MapList[j][m.Key].(string) |
| | | } |
| | | |
| | | func (m *MapsSort) Swap(i, j int) { |
| | | m.MapList[i], m.MapList[j] = m.MapList[j], m.MapList[i] |
| | | } |
| | | |
| | | /*******************sort []map util*******************/ |
| | | //根据时间范围聚合所有区域人信息,返回固定条数 |
| | | func GetFaceDataByTimeAndTotal(startTime string, endTime string, total int, thresholdTime int, thresholdStayTime int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) { |
| | | var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | var requestBody = `{ |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "range": { |
| | | "picDate": { |
| | | "gte": "` + startTime + `", |
| | | "lte": "` + endTime + `" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "term":{ |
| | | "targetInfo.targetType.raw": "FaceDetect" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "size": 0, |
| | | "aggs": { |
| | | "buckets_aggs": { |
| | | "composite": { |
| | | "sources": [ |
| | | { |
| | | "faceId": { |
| | | "terms": { |
| | | "field": "baseInfo.targetId" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "areaId": { |
| | | "terms": { |
| | | "field": "targetInfo.areaId" |
| | | } |
| | | } |
| | | } |
| | | ], |
| | | "size": 10000000 |
| | | }, |
| | | "aggs": { |
| | | "top_attention_hits": { |
| | | "top_hits": { |
| | | "size": 1000000, |
| | | "sort": [ |
| | | { |
| | | "picDate": { |
| | | "order": "asc" |
| | | } |
| | | } |
| | | ], |
| | | "_source": { |
| | | "includes": [ |
| | | "baseInfo.targetId", |
| | | "targetInfo.picSmUrl", |
| | | "targetInfo.areaId", |
| | | "picDate" |
| | | ] |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | }` |
| | | buf, err := EsReq("POST", requestUrl, []byte(requestBody)) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | source, err := FaceSourceAggregations(buf, thresholdTime, thresholdStayTime) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | if len(source) == 0 { |
| | | return source, nil |
| | | } |
| | | faceSource := make([]map[string]interface{}, 0) |
| | | for index, info := range source { |
| | | if int(info["stayTime"].(float64)) > thresholdStayTime { |
| | | faceSource = append(faceSource, source[index]) |
| | | } |
| | | } |
| | | mapsSort := MapsSort{} |
| | | mapsSort.Key = "endTime" |
| | | mapsSort.MapList = faceSource |
| | | sort.Sort(&mapsSort) |
| | | if len(faceSource) > total { |
| | | return mapsSort.MapList[:total], nil |
| | | } |
| | | return mapsSort.MapList, nil |
| | | } |
| | | |
| | | func GetFaceDataByTimeAndId(startTime string, endTime string, id string, thresholdTime int, thresholdStayTime int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) { |
| | | var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | var requestBody = `{ |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "range": { |
| | | "picDate": { |
| | | "gte": "` + startTime + `", |
| | | "lte": "` + endTime + `" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "term":{ |
| | | "targetInfo.targetType.raw": "FaceDetect" |
| | | } |
| | | }, |
| | | { |
| | | "term":{ |
| | | "baseInfo.targetId": "` + id + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "size": 0, |
| | | "aggs": { |
| | | "buckets_aggs": { |
| | | "composite": { |
| | | "sources": [ |
| | | { |
| | | "faceId": { |
| | | "terms": { |
| | | "field": "baseInfo.targetId" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "areaId": { |
| | | "terms": { |
| | | "field": "targetInfo.areaId" |
| | | } |
| | | } |
| | | } |
| | | ], |
| | | "size": 10000000 |
| | | }, |
| | | "aggs": { |
| | | "top_attention_hits": { |
| | | "top_hits": { |
| | | "size": 1000000, |
| | | "sort": [ |
| | | { |
| | | "picDate": { |
| | | "order": "asc" |
| | | } |
| | | } |
| | | ], |
| | | "_source": { |
| | | "includes": [ |
| | | "baseInfo.targetId", |
| | | "targetInfo.picSmUrl", |
| | | "targetInfo.areaId", |
| | | "picDate" |
| | | ] |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | }` |
| | | buf, err := EsReq("POST", requestUrl, []byte(requestBody)) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | source, err := FaceSourceAggregations(buf, thresholdTime, thresholdStayTime) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | if len(source) == 0 { |
| | | return source, nil |
| | | } |
| | | faceSource := make([]map[string]interface{}, 0) |
| | | for index, info := range source { |
| | | if int(info["stayTime"].(float64)) > thresholdStayTime { |
| | | faceSource = append(faceSource, source[index]) |
| | | } |
| | | } |
| | | mapsSort := MapsSort{} |
| | | mapsSort.Key = "startTime" |
| | | mapsSort.MapList = faceSource |
| | | sort.Sort(&mapsSort) |
| | | return mapsSort.MapList, nil |
| | | } |
| | | |
| | | func GetFaceIdDeduplication(startTime string, endTime string, serverIp string, serverPort string, indexName string) (ids []map[string]interface{}, err error) { |
| | | var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | var requestBody = `{ |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "range": { |
| | | "picDate": { |
| | | "gte": "` + startTime + `", |
| | | "lte": "` + endTime + `" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "term": { |
| | | "targetInfo.targetType.raw": "FaceDetect" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "size": 0, |
| | | "aggs": { |
| | | "buckets_aggs": { |
| | | "composite": { |
| | | "sources": [ |
| | | { |
| | | "faceId": { |
| | | "terms": { |
| | | "field": "baseInfo.targetId" |
| | | } |
| | | } |
| | | } |
| | | ], |
| | | "size": 10000000 |
| | | }, |
| | | "aggs": { |
| | | "top_attention_hits": { |
| | | "top_hits": { |
| | | "size": 1, |
| | | "sort": [ |
| | | { |
| | | "picDate": { |
| | | "order": "desc" |
| | | } |
| | | } |
| | | ], |
| | | "_source": { |
| | | "includes": [ |
| | | "picDate" |
| | | ] |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | }` |
| | | //fmt.Println(requestUrl) |
| | | //fmt.Println(requestBody) |
| | | buf, err := EsReq("POST", requestUrl, []byte(requestBody)) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | ids, err1 := SourceDeduplication(buf) |
| | | if err1 != nil { |
| | | return nil, err1 |
| | | } |
| | | if len(ids) > 1 { |
| | | mapsSort := MapsSort{} |
| | | mapsSort.Key = "lastTime" |
| | | mapsSort.MapList = ids |
| | | sort.Sort(&mapsSort) |
| | | return mapsSort.MapList, nil |
| | | } |
| | | return ids, nil |
| | | } |
| | | |
| | | //统计各个区域人数 |
| | | func StatisticsEveryAreaPersonsNumber(startTime string, endTime string, serverIp string, serverPort string, indexName string) ([]map[string]interface{}, error) { |
| | | var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | var requestBody = `{ |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "range": { |
| | | "picDate": { |
| | | "gte": "` + startTime + `", |
| | | "lte": "` + endTime + `" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "term": { |
| | | "targetInfo.targetType.raw": "Yolo" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "size": 0, |
| | | "aggs": { |
| | | "buckets_aggs": { |
| | | "composite": { |
| | | "sources": [ |
| | | { |
| | | "areaId": { |
| | | "terms": { |
| | | "field": "targetInfo.areaId" |
| | | } |
| | | } |
| | | } |
| | | ], |
| | | "size": 10000000 |
| | | } |
| | | } |
| | | } |
| | | }` |
| | | buf, err := EsReq("POST", requestUrl, []byte(requestBody)) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | result, err := SourceStatistics(buf) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | return result, nil |
| | | } |
| | | |
| | | /**************************************customer analysis util end**************************************/ |
| | | //根据摄像机列表和时间查询人员浏览轨迹 |
| | | func GetPersonDataByCameraIdAndTime(cameraId []string, startTime string, endTime string, serverIp string, serverPort string, indexName string) (map[string]interface{}, error) { |
| | | |
| | | var filterArr []string |
| | | if cameraId != nil && len(cameraId) > 0 { |
| | | esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1) |
| | | filterArr = append(filterArr, `{ |
| | | "terms": { |
| | | "cameraId": ["`+esCameraId+`"] |
| | | } |
| | | }`) |
| | | } |
| | | filterArr = append(filterArr, `{ |
| | | "range": { |
| | | "picDate": { |
| | | "gte": "`+startTime+`", |
| | | "lte": "`+endTime+`" |
| | | } |
| | | } |
| | | }`) |
| | | filterArr = append(filterArr, ` { |
| | | "term": { |
| | | "targetInfo.targetType.raw": "Yolo" |
| | | } |
| | | }`) |
| | | queryStr := strings.Join(filterArr, ",") |
| | | |
| | | personUrl := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | personBody := `{ |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | ` + queryStr + ` |
| | | ] |
| | | } |
| | | }, |
| | | "size": 2147483647, |
| | | "_source": { |
| | | "includes": [ |
| | | "cameraId", |
| | | "cameraName", |
| | | "cameraAddr", |
| | | "targetInfo.targetScore", |
| | | "picDate", |
| | | "updateTime", |
| | | "picMaxUrl", |
| | | "targetInfo.belongsTargetId", |
| | | "targetInfo.targetLocation", |
| | | "picWH" |
| | | ] |
| | | } |
| | | }` |
| | | //fmt.Println(personUrl) |
| | | //fmt.Println(personBody) |
| | | source := make(map[string]interface{}) |
| | | queryStartTime := time.Now() |
| | | buf, err := EsReq("POST", personUrl, []byte(personBody)) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | queryUseTime := time.Now().Sub(queryStartTime).Seconds() * 1000 |
| | | sources, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | resData, err := PerSonAnalysis(sources) |
| | | source["result"] = resData |
| | | source["total"] = len(resData) |
| | | source["queryUseTime"] = queryUseTime |
| | | //println(sources) |
| | | return source, nil |
| | | |
| | | } |
| | | |
| | | //根据时间范围,摄像机列表,分组聚合人脸列表,返回分组数据 |
| | | func GetFaceDataBucketsByCameraIdAndTimeReturnByGrouped(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) { |
| | | var filterArr []string |
| | | if cameraId != nil && len(cameraId) > 0 { |
| | | esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1) |
| | | filterArr = append(filterArr, `{ |
| | | "terms": { |
| | | "cameraId": ["`+esCameraId+`"] |
| | | } |
| | | }`) |
| | | } |
| | | if personId != nil && len(personId) > 0 { |
| | | esPersonId := strings.Replace(strings.Trim(fmt.Sprint(personId), "[]"), " ", "\",\"", -1) |
| | | filterArr = append(filterArr, `{ |
| | | "terms": { |
| | | "baseInfo.targetId": ["`+esPersonId+`"] |
| | | } |
| | | }`) |
| | | } |
| | | filterArr = append(filterArr, `{ |
| | | "range": { |
| | | "picDate": { |
| | | "gte": "`+startTime+`", |
| | | "lte": "`+endTime+`" |
| | | } |
| | | } |
| | | }`) |
| | | filterArr = append(filterArr, ` { |
| | | "term": { |
| | | "targetInfo.targetType.raw": "FaceDetect" |
| | | } |
| | | }`) |
| | | queryStr := strings.Join(filterArr, ",") |
| | | |
| | | var buckersUrl = "http://" + serverIp + ":" + ServerPort + "/" + indexName + "/_search" |
| | | var buckersBody = `{ |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | ` + queryStr + ` |
| | | ] |
| | | } |
| | | }, |
| | | "size": 0, |
| | | "aggs": { |
| | | "buckets_aggs": { |
| | | "composite": { |
| | | "sources": [ |
| | | { |
| | | "baseInfo.targetId": { |
| | | "terms": { |
| | | "field": "baseInfo.targetId" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "cameraId": { |
| | | "terms": { |
| | | "field": "cameraId" |
| | | } |
| | | } |
| | | } |
| | | ], |
| | | "size": 10000000 |
| | | }, |
| | | "aggs":{ |
| | | "top_attention_hits":{ |
| | | "top_hits":{ |
| | | "size": 1000000, |
| | | "sort": [ |
| | | { |
| | | "picDate": { |
| | | "order": "asc" |
| | | } |
| | | } |
| | | ], |
| | | "_source":{ |
| | | "includes":["baseInfo.targetId","cameraId","cameraName","cameraAddr","targetInfo.targetScore","targetInfo.picSmUrl","showLabels","baseInfo.tableId","baseInfo.tableName","baseInfo.bwType","baseInfo.targetName","baseInfo.compareScore","picDate","picMaxUrl","picWH"] |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | }` |
| | | //fmt.Println(buckersUrl) |
| | | //fmt.Println(buckersBody) |
| | | sources := make(map[string]interface{}) |
| | | queryStartTime := time.Now() |
| | | buf, err := EsReq("POST", buckersUrl, []byte(buckersBody)) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | queryUseTime := time.Now().Sub(queryStartTime).Seconds() * 1000 |
| | | //fmt.Println(queryUseTime) |
| | | tmpSources, err := SourceAggregationsReturnByGrouped(buf, thresholdTime) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | sources["result"] = tmpSources |
| | | sources["total"] = len(tmpSources) |
| | | sources["queryUseTime"] = queryUseTime |
| | | //println(sources) |
| | | return sources, nil |
| | | } |
| | | |
| | | //根据时间范围,摄像机列表,分组聚合人脸列表 |
| | | func GetFaceDataBucketsByCameraIdAndTime(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) { |
| | | var filterArr []string |
| | | if cameraId != nil && len(cameraId) > 0 { |
| | | esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1) |
| | | filterArr = append(filterArr, `{ |
| | | "terms": { |
| | | "cameraId": ["`+esCameraId+`"] |
| | | } |
| | | }`) |
| | | } |
| | | if personId != nil && len(personId) > 0 { |
| | | esPersonId := strings.Replace(strings.Trim(fmt.Sprint(personId), "[]"), " ", "\",\"", -1) |
| | | filterArr = append(filterArr, `{ |
| | | "terms": { |
| | | "baseInfo.targetId": ["`+esPersonId+`"] |
| | | } |
| | | }`) |
| | | } |
| | | filterArr = append(filterArr, `{ |
| | | "range": { |
| | | "picDate": { |
| | | "gte": "`+startTime+`", |
| | | "lte": "`+endTime+`" |
| | | } |
| | | } |
| | | }`) |
| | | filterArr = append(filterArr, ` { |
| | | "term": { |
| | | "targetInfo.targetType.raw": "FaceDetect" |
| | | } |
| | | }`) |
| | | queryStr := strings.Join(filterArr, ",") |
| | | |
| | | var buckersUrl = "http://" + serverIp + ":" + ServerPort + "/" + indexName + "/_search" |
| | | var buckersBody = `{ |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | ` + queryStr + ` |
| | | ] |
| | | } |
| | | }, |
| | | "size": 0, |
| | | "aggs": { |
| | | "buckets_aggs": { |
| | | "composite": { |
| | | "sources": [ |
| | | { |
| | | "baseInfo.targetId": { |
| | | "terms": { |
| | | "field": "baseInfo.targetId" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "cameraId": { |
| | | "terms": { |
| | | "field": "cameraId" |
| | | } |
| | | } |
| | | } |
| | | ], |
| | | "size": 10000000 |
| | | }, |
| | | "aggs":{ |
| | | "top_attention_hits":{ |
| | | "top_hits":{ |
| | | "size": 1000000, |
| | | "sort": [ |
| | | { |
| | | "picDate": { |
| | | "order": "asc" |
| | | } |
| | | } |
| | | ], |
| | | "_source":{ |
| | | "includes":["baseInfo.targetId","cameraId","cameraName","cameraAddr","targetInfo.targetScore","targetInfo.picSmUrl","showLabels","baseInfo.tableId","baseInfo.tableName","baseInfo.bwType","baseInfo.targetName","baseInfo.compareScore","picDate","picMaxUrl","picWH"] |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | }` |
| | | //fmt.Println(buckersUrl) |
| | | //fmt.Println(buckersBody) |
| | | queryStartTime := time.Now() |
| | | buf, err := EsReq("POST", buckersUrl, []byte(buckersBody)) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | queryUseTime := time.Now().Sub(queryStartTime).Seconds() * 1000 |
| | | |
| | | sources, err := SourceAggregations(buf, thresholdTime, queryUseTime) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | return sources, nil |
| | | } |
| | | |
| | | //根据抓拍人员id更新(picurl)图片地址 |
| | | func UpdatePicUrlById(id string, picUrl string, indexName string, serverIp string, serverPort string) (err error) { |
| | | updateTime := time.Now().Format("2006-01-02 15:04:05") |
| | | tRes, err := AIOceaninfosbyid([]string{id}, indexName, serverIp, serverPort) |
| | | if err != nil || len(tRes) == 0 { |
| | | return err |
| | | } |
| | | picMaxUrls := tRes[0].PicMaxUrl |
| | | sourceStr := ` |
| | | "source": "ctx._source.picMaxUrl.add('` + picUrl + `');ctx._source.updateTime='` + updateTime + `'" |
| | | ` |
| | | if len(picMaxUrls) >= 2 { |
| | | sourceStr = `"source": "ctx._source.picMaxUrl[1]='` + picUrl + `';ctx._source.updateTime='` + updateTime + `'"` |
| | | } |
| | | var info interface{} |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query?refresh=true" |
| | | |
| | | var picUrlInfo = ` |
| | | { |
| | | "script": { |
| | | ` + sourceStr + ` |
| | | }, |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "term": { |
| | | "id": "` + id + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | } |
| | | } |
| | | ` |
| | | //logPrint("url: ", url, videoUrlInfo) |
| | | //fmt.Println(url, picUrlInfo) |
| | | buf, err := EsReq("POST", url, []byte(picUrlInfo)) |
| | | if err != nil { |
| | | logPrint("http request videoUrlInfo info is err!") |
| | | return err |
| | | } |
| | | json.Unmarshal(buf, &info) |
| | | //logPrint(info) |
| | | out, ok := info.(map[string]interface{}) |
| | | if !ok { |
| | | logPrint("http response interface can not change map[string]interface{}") |
| | | return errors.New("http response interface can not change map[string]interface{}") |
| | | } |
| | | middle, ok := out["updated"].(float64) |
| | | if !ok { |
| | | logPrint("first updated change error!", out) |
| | | return errors.New("first updated change error!") |
| | | } |
| | | if middle == 1 { |
| | | return nil |
| | | } |
| | | if middle == 0 { |
| | | return errors.New("已经修改") |
| | | } |
| | | return nil |
| | | } |
| | | |
| | | //根据抓拍人员id更新(videourl)摄像机地址 |
| | | func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int,err error) { |
| | | func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int, err error) { |
| | | |
| | | var info interface{} |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query?refresh=true" |
| | |
| | | } |
| | | } |
| | | ` |
| | | //fmt.Println("url: ", url, videoUrlInfo) |
| | | //logPrint("url: ", url, videoUrlInfo) |
| | | buf, err := EsReq("POST", url, []byte(videoUrlInfo)) |
| | | if err != nil { |
| | | fmt.Println("http request videoUrlInfo info is err!") |
| | | logPrint("http request videoUrlInfo info is err!") |
| | | statu = 500 |
| | | return statu,err |
| | | return statu, err |
| | | } |
| | | json.Unmarshal(buf, &info) |
| | | //fmt.Println(info) |
| | | //logPrint(info) |
| | | out, ok := info.(map[string]interface{}) |
| | | if !ok { |
| | | fmt.Println("http response interface can not change map[string]interface{}") |
| | | logPrint("http response interface can not change map[string]interface{}") |
| | | statu = 500 |
| | | return statu,errors.New("http response interface can not change map[string]interface{}") |
| | | return statu, errors.New("http response interface can not change map[string]interface{}") |
| | | } |
| | | middle, ok := out["updated"].(float64) |
| | | if !ok { |
| | | fmt.Println("first updated change error!") |
| | | batches, ok1 := out["batches"].(float64) |
| | | if !ok || !ok1 { |
| | | logPrint("first updated change error!") |
| | | statu = 500 |
| | | return statu,errors.New("first updated change error!") |
| | | return statu, errors.New("first updated change error!") |
| | | } |
| | | if middle == 1 { |
| | | statu = 200 |
| | | return statu,nil |
| | | if batches == 0 { |
| | | logPrint("no such doc in database") |
| | | statu = 400 |
| | | return statu, errors.New("目标数据不存在") |
| | | } else { |
| | | if middle == 1 { |
| | | statu = 200 |
| | | return statu, nil |
| | | } |
| | | if middle == 0 { |
| | | statu = 201 |
| | | return statu, errors.New("已经修改") |
| | | } |
| | | } |
| | | if middle == 0 { |
| | | statu = 201 |
| | | return statu,errors.New("已经修改") |
| | | } |
| | | return statu,nil |
| | | return statu, nil |
| | | } |
| | | |
| | | //获取当前节点抓拍库所有人员ID*缓存* |
| | |
| | | isCollectStr := "" |
| | | isCollect := compareArgs.Collection |
| | | if isCollect != "" { |
| | | isCollectStr = "{\"term\":{\"isCollect\":\"" + isCollect + "\"}}," |
| | | //isCollectStr = "{\"term\":{\"isCollect\":\"" + isCollect + "\"}}," |
| | | if isCollect == "1" { |
| | | isCollectStr = "{\"term\":{\"isCollect\":true}}," |
| | | } else if isCollect == "0" { |
| | | isCollectStr = "{\"term\":{\"isCollect\":false}}," |
| | | } |
| | | } |
| | | |
| | | //判断布防等级 |
| | |
| | | "\"size\":\"1000\"," + |
| | | "\"query\":{\"bool\":{" + queryStr + |
| | | "\"filter\":[" + |
| | | "{\"term\":{\"targetInfo.targetType.raw\":\"face\"}}," + |
| | | "{\"term\":{\"targetInfo.targetType.raw\":\"FaceDetect\"}}," + |
| | | cameraIdStr + |
| | | alarmLevelStr + |
| | | taskIdStr + |
| | |
| | | go func(reqParam string) { |
| | | defer wg.Done() |
| | | |
| | | //fmt.Println(url) |
| | | //fmt.Println(prama) |
| | | logPrint(url) |
| | | logPrint(prama) |
| | | buf, err := EsReq("POST", url, []byte(reqParam)) |
| | | |
| | | if err != nil { |
| | | fmt.Println("http request videoUrlInfo info is err!") |
| | | fmt.Println(len(capturetable)) |
| | | logPrint("http request videoUrlInfo info is err!") |
| | | logPrint(len(capturetable)) |
| | | return |
| | | } |
| | | |
| | | sources, err := Sourcelistforscroll(buf) |
| | | |
| | | if err != nil { |
| | | fmt.Println(len(capturetable)) |
| | | logPrint(len(capturetable)) |
| | | return |
| | | } |
| | | for _, source := range sources["sourcelist"].([]map[string]interface{}) { |
| | |
| | | "scroll": "1m", |
| | | "scroll_id" : "` + scroll_id + `" |
| | | }` |
| | | //fmt.Println(scroll_url) |
| | | //fmt.Println(jsonDSL) |
| | | logPrint(scroll_url) |
| | | logPrint(jsonDSL) |
| | | buf, err := EsReq("POST", scroll_url, []byte(jsonDSL)) |
| | | |
| | | if err != nil { |
| | | fmt.Println("lenth1: ", len(capturetable)) |
| | | logPrint("lenth1: ", len(capturetable)) |
| | | return |
| | | } |
| | | nextSources, err := Sourcelistforscroll(buf) |
| | |
| | | } |
| | | |
| | | nextM := nextSources["sourcelist"].([]map[string]interface{}) |
| | | //fmt.Println("id",nextSources) |
| | | //logPrint("id",nextSources) |
| | | if nextM == nil || len(nextM) == 0 { |
| | | //fmt.Println("lenth: ", len(capturetable)) |
| | | //logPrint("lenth: ", len(capturetable)) |
| | | return |
| | | } |
| | | //fmt.Println("id") |
| | | //logPrint("id") |
| | | for _, source := range nextM { |
| | | tmpList = append(tmpList, source["id"].(string)) |
| | | } |
| | | //fmt.Println("tmpList: ", len(tmpList)) |
| | | //logPrint("tmpList: ", len(tmpList)) |
| | | lock.Lock() |
| | | capturetable = append(capturetable, tmpList...) |
| | | lock.Unlock() |
| | |
| | | } |
| | | wg.Wait() |
| | | |
| | | fmt.Println("lenth_all: ", len(capturetable)) |
| | | fmt.Println("耗时:", time.Since(ts)) |
| | | logPrint("lenth_all: ", len(capturetable)) |
| | | logPrint("耗时:", time.Since(ts)) |
| | | return capturetable |
| | | } |
| | | |
| | |
| | | if category != "all" { |
| | | filterArr = append(filterArr, ` { |
| | | "term":{ |
| | | "targetInfo.targetType":"`+category+`" |
| | | "targetInfo.targetType.raw":"`+category+`" |
| | | } |
| | | }`) |
| | | |
| | |
| | | "sort":[{"picDate":{"order":"desc"}}], |
| | | "_source": {"includes":[],"excludes":["*.feature"]} |
| | | }` |
| | | fmt.Println(DSLJson) |
| | | //logPrint(DSLJson) |
| | | buf, err := EsReq("POST", url, []byte(DSLJson)) |
| | | if err != nil { |
| | | return aIOceanInfo, err |
| | |
| | | } |
| | | |
| | | aIOcean := AIOceanAnalysis(sources) |
| | | //fmt.Println(len(videoperson)) |
| | | //logPrint(len(videoperson)) |
| | | return aIOcean, nil |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | aIOcean := AIOceanAnalysis(sources) |
| | | fmt.Println(len(aIOcean)) |
| | | logPrint(len(aIOcean)) |
| | | return aIOcean, nil |
| | | } |
| | | |
| | |
| | | } |
| | | } |
| | | }` |
| | | //fmt.Println(DSLJson) |
| | | buf, err := EsReq("POST", url, []byte(DSLJson)) |
| | | if err != nil { |
| | | return total, err |
| | |
| | | return total, errors.New("first hits change error!") |
| | | } |
| | | total = int(middle["total"].(float64)) |
| | | //fmt.Println(total) |
| | | //logPrint(total) |
| | | return total, nil |
| | | } |
| | | |
| | |
| | | } |
| | | }, |
| | | "aggs":{ |
| | | "sdkName_status":{ |
| | | "taskName_status":{ |
| | | "terms":{ |
| | | "field":"sdkName.raw" |
| | | "field":"taskName.raw" |
| | | } |
| | | } |
| | | } |
| | |
| | | if !ok { |
| | | return nil, errors.New("first hits change error!") |
| | | } |
| | | sdkName_status, ok := middle["sdkName_status"].(map[string]interface{}) |
| | | sdkName_status, ok := middle["taskName_status"].(map[string]interface{}) |
| | | if !ok { |
| | | return nil, errors.New("first hits change error!") |
| | | } |
| | | |
| | | //fmt.Println(sdkName_status) |
| | | for _, in := range sdkName_status["buckets"].([]interface{}) { |
| | | var source = make(map[string]interface{}, 0) |
| | | tmpbuf, ok := in.(map[string]interface{}) |
| | | if !ok { |
| | | fmt.Println("change to source error!") |
| | | logPrint("change to source error!") |
| | | continue |
| | | } |
| | | sdkName := tmpbuf["key"].(string) |
| | |
| | | source["value"] = count |
| | | sources = append(sources, source) |
| | | } |
| | | //fmt.Println("tmpSource",sources) |
| | | //logPrint("tmpSource",sources) |
| | | return sources, nil |
| | | } |
| | | |
| | | //聚合任务列表,taskId+taskName |
| | | func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string) (sources []map[string]interface{}, err error) { |
| | | func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string, cameraIds []string) (sources []map[string]interface{}, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | | "/" + indexName + "/_search" |
| | | serverFilterStr := "" |
| | | cameIdFilterStr := "" |
| | | if cameraIds != nil && len(cameraIds) > 0 { |
| | | cameIdsStr := strings.Replace(strings.Trim(fmt.Sprint(cameraIds), "[]"), " ", "\",\"", -1) |
| | | cameIdFilterStr = `,{ |
| | | "term": { |
| | | "cameraId": "` + cameIdsStr + `" |
| | | } |
| | | }` |
| | | } |
| | | if analyServerId != "" { |
| | | serverFilterStr = `, |
| | | "query": { |
| | |
| | | { |
| | | "term": { |
| | | "analyServerId": "` + analyServerId + `" |
| | | } |
| | | } |
| | | } |
| | | ` + cameIdFilterStr + ` |
| | | ] |
| | | } |
| | | }` |
| | |
| | | var source = make(map[string]interface{}, 0) |
| | | tmpbuf, ok := in.(map[string]interface{}) |
| | | if !ok { |
| | | fmt.Println("change to source error!") |
| | | logPrint("change to source error!") |
| | | continue |
| | | } |
| | | task := tmpbuf["key"].(map[string]interface{}) |
| | |
| | | source["count"] = count |
| | | sources = append(sources, source) |
| | | } |
| | | //fmt.Println("tmpSource",sources) |
| | | //logPrint("tmpSource",sources) |
| | | return sources, nil |
| | | |
| | | } |
| | | |
| | | //添加即将删除信号 |
| | | func AddDeleteSignal() { |
| | | |
| | | } |
| | | |
| | |
| | | "bool": { |
| | | "filter": [{ |
| | | "term": { |
| | | "targetInfo.targetType.raw": "`+targetType+`" |
| | | "targetInfo.targetType.raw": "` + targetType + `" |
| | | } |
| | | }] |
| | | } |
| | |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local" |
| | | var source []string |
| | | switch targetType { |
| | | case "face": |
| | | source = []string{"id","targetInfo.feature","analyServerId","cameraId"} |
| | | case "face", "FaceDetect": |
| | | source = []string{"id", "targetInfo.feature", "analyServerId", "cameraId"} |
| | | case "track": |
| | | source = []string{"id","targetInfo.feature","analyServerId","cameraId","targetInfo.attachTarget.feature","linkTagInfo.targetInfo.feature","linkTagInfo.targetInfo.attachTarget.feature","linkTagInfo.cameraId"} |
| | | source = []string{"id", "targetInfo.feature", "analyServerId", "cameraId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.cameraId", "linkTagInfo.targetInfo.targetLocation"} |
| | | } |
| | | JsonDSL := ` |
| | | { |
| | |
| | | "filter": [ |
| | | { |
| | | "term": { |
| | | "targetInfo.targetType.raw": "`+targetType+`" |
| | | "targetInfo.targetType.raw": "` + targetType + `" |
| | | } |
| | | }, |
| | | { |
| | |
| | | } |
| | | }, |
| | | "size": 1000000, |
| | | "_source": ["`+strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1)+`"] |
| | | "_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"] |
| | | } |
| | | ` |
| | | //logger.Debug(url) |
| | | //logger.Debug(JsonDSL) |
| | | //fmt.Println(JsonDSL) |
| | | //logPrint(JsonDSL) |
| | | buf, err := EsReq("POST", url, []byte(JsonDSL)) |
| | | if err != nil { |
| | | return capdbinfo, errors.New("http request dbtablename info is err!") |
| | |
| | | if err != nil { |
| | | return capdbinfo, err |
| | | } |
| | | //fmt.Println(sources) |
| | | //logPrint(sources) |
| | | // 返回所有查询的数据 |
| | | capdbinfos := Parsesources(sources) |
| | | return capdbinfos, nil |
| | | } |
| | | |
| | | // 查询底库人员信息*缓存* |
| | | func GetOceanFeatures(serverIp string, serverPort string, queryIndexNum int, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) { |
| | | var dbinfos []*protomsg.MultiFeaCache |
| | | point := strconv.Itoa(queryIndexNum) |
| | | number := strconv.Itoa(queryNums) |
| | | func GetOceanFeatures(serverIp string, serverPort string, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) { |
| | | //queryIndexNum int |
| | | //var dbinfos []*protomsg.MultiFeaCache |
| | | dbinfos := make([]*protomsg.MultiFeaCache, 0) |
| | | //dbinfosss := make([]*protomsg.MultiFeaCache,0) |
| | | //dbinfoss = append(dbinfoss, dbinfosss...) |
| | | |
| | | JsonDSL := "" |
| | | var source []string |
| | | switch targetType { |
| | | case "face": |
| | | source = []string{"id","targetInfo.feature","analyServerId"} |
| | | case "face", "FaceDetect": |
| | | source = []string{"id", "targetInfo.feature", "analyServerId"} |
| | | case "track": |
| | | source = []string{"id","targetInfo.feature","analyServerId","targetInfo.attachTarget.feature","linkTagInfo.targetInfo.feature","linkTagInfo.targetInfo.attachTarget.feature"} |
| | | source = []string{"id", "targetInfo.feature", "analyServerId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.targetInfo.targetLocation"} |
| | | } |
| | | |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local" |
| | | JsonDSL = ` { |
| | | "from": ` + point + `, |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local;scroll=1m" |
| | | |
| | | var lock sync.RWMutex |
| | | var wg sync.WaitGroup |
| | | |
| | | for i := 0; i < 48; i++ { |
| | | //请求体 |
| | | JsonDSL = ` { |
| | | "slice": { |
| | | "id": "` + strconv.Itoa(i) + `", |
| | | "max": 48 |
| | | }, |
| | | "size":` + strconv.Itoa(queryNums) + `, |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "term": { |
| | | "targetInfo.targetType.raw": "`+targetType+`" |
| | | "targetInfo.targetType.raw": "` + targetType + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "size":` + number + `, |
| | | "_source": ["`+strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1)+`"] |
| | | "_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"] |
| | | }` |
| | | wg.Add(1) |
| | | go func(reqJsonDSL string) { |
| | | defer wg.Done() |
| | | |
| | | buf, err := EsReq("POST", url, []byte(JsonDSL)) |
| | | if err != nil { |
| | | return dbinfos, errors.New("http request dbtablename info is err!") |
| | | //fmt.Println(url) |
| | | //fmt.Println(prama) |
| | | //logPrint("url: ",url) |
| | | //logPrint("url: ",reqJsonDSL) |
| | | buf, err := EsReq("POST", url, []byte(reqJsonDSL)) |
| | | if err != nil { |
| | | logPrint("EsReq: ", err) |
| | | return |
| | | } |
| | | |
| | | // 返回 _source 数组 |
| | | sources, err := Sourcelistforscroll(buf) |
| | | if err != nil { |
| | | logPrint("EsReq: ", err) |
| | | return |
| | | } |
| | | // 返回所有查询的数据 |
| | | ftmpDatas := Parsesources(sources["sourcelist"].([]map[string]interface{})) |
| | | lock.Lock() |
| | | dbinfos = append(dbinfos, ftmpDatas...) |
| | | //logPrint("prsLen: ", len(Parsesources(sources["sourcelist"].([]map[string]interface{})))) |
| | | //logPrint("dbinfosLen: ", len(dbinfos)) |
| | | lock.Unlock() |
| | | |
| | | scroll_id := sources["scroll_id"].(string) |
| | | |
| | | //scroll请求头 |
| | | scroll_url := "http://" + serverIp + ":" + serverPort + "/_search/scroll" |
| | | for { |
| | | next_scroll_id := "" |
| | | if next_scroll_id != "" { |
| | | scroll_id = next_scroll_id |
| | | } |
| | | jsonDSL := `{ |
| | | "scroll": "1m", |
| | | "scroll_id" : "` + scroll_id + `" |
| | | }` |
| | | //fmt.Println(scroll_url) |
| | | //fmt.Println(jsonDSL) |
| | | buf, err := EsReq("POST", scroll_url, []byte(jsonDSL)) |
| | | |
| | | if err != nil { |
| | | //fmt.Println("lenth1: ", len(dbinfos)) |
| | | return |
| | | } |
| | | nextSources, err := Sourcelistforscroll(buf) |
| | | |
| | | if nextSources == nil { |
| | | return |
| | | } |
| | | |
| | | nextM := nextSources["sourcelist"].([]map[string]interface{}) |
| | | //fmt.Println("id",nextSources) |
| | | if nextM == nil || len(nextM) == 0 { |
| | | //fmt.Println("lenth: ", len(capturetable)) |
| | | return |
| | | } |
| | | tmpDatas := Parsesources(nextM) |
| | | lock.Lock() |
| | | dbinfos = append(dbinfos, tmpDatas...) |
| | | //logPrint("tmpDatasLen: ", len(tmpDatas)) |
| | | //logPrint("AdbinfosLen: ", len(dbinfos)) |
| | | lock.Unlock() |
| | | |
| | | next_scroll_id = nextSources["scroll_id"].(string) |
| | | } |
| | | |
| | | }(JsonDSL) |
| | | } |
| | | wg.Wait() |
| | | |
| | | // 返回 _source 数组 |
| | | sources, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return dbinfos, err |
| | | //fmt.Println("lenth_all: ", len(dbinfos)) |
| | | |
| | | return dbinfos, nil |
| | | } |
| | | |
| | | //************************CORN TASK******************************* |
| | | //查询日期范围内是否还存在数据 |
| | | func QueryAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | deleteJson := `{ |
| | | "query":{ |
| | | "bool":{ |
| | | "filter":[{ |
| | | "range":{ |
| | | "updateTime":{ |
| | | "gte":"` + startTime + `", |
| | | "lte":"` + endTime + `" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "term":{ |
| | | "analyServerId":"` + analyServerId + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | } |
| | | } ` |
| | | buf, err := EsReq("POST", url, []byte(deleteJson)) |
| | | if err != nil { |
| | | return false, errors.New("请求失败") |
| | | } |
| | | resTotal, err := SourceTotal(buf) |
| | | if err != nil { |
| | | return false, errors.New("解码失败") |
| | | } |
| | | if resTotal == -1 || resTotal == 0 { |
| | | result = false |
| | | } else { |
| | | result = true |
| | | } |
| | | return result, nil |
| | | } |
| | | |
| | | // 返回所有查询的数据 |
| | | dbpersoninfos := Parsesources(sources) |
| | | return dbpersoninfos, nil |
| | | //按日期范围,服务器Id删除数据 |
| | | func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (total int, err error, ) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_delete_by_query" |
| | | deleteJson := `{ |
| | | "query":{ |
| | | "bool":{ |
| | | "filter":[{ |
| | | "range":{ |
| | | "updateTime":{ |
| | | "gte":"` + startTime + `", |
| | | "lte":"` + endTime + `" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "term":{ |
| | | "analyServerId":"` + analyServerId + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | } |
| | | } ` |
| | | fmt.Println(url) |
| | | fmt.Println(deleteJson) |
| | | buf, err := EsReq("POST", url, []byte(deleteJson)) |
| | | if err != nil { |
| | | return -1, errors.New("请求失败") |
| | | } |
| | | deleteRes, err := SourceDeleted(buf) |
| | | if err != nil { |
| | | return -1, errors.New("解码失败") |
| | | } |
| | | return deleteRes, nil |
| | | } |
| | | |
| | | //给所有节点追加删除任务信息 |
| | | func AddDelTask(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query" |
| | | addJson := `{ |
| | | "script": { |
| | | "lang":"painless", |
| | | "inline": "ctx._source.instantTask.add(params.newtask)", |
| | | "params": { |
| | | "newtask": { |
| | | "instantClearId": "` + analyServerId + `", |
| | | "startTime": "` + startTime + `", |
| | | "endTime": "` + endTime + `" |
| | | } |
| | | } |
| | | }, |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "term": { |
| | | "application": "loopCoverage" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | } |
| | | }` |
| | | buf, err := EsReq("POST", url, []byte(addJson)) |
| | | if err != nil { |
| | | return false, errors.New("请求失败") |
| | | } |
| | | updateRes, err := SourceUpdated(buf) |
| | | if err != nil { |
| | | return false, errors.New("解码失败") |
| | | } |
| | | if updateRes == -1 { |
| | | result = false |
| | | } else { |
| | | result = true |
| | | } |
| | | return result, nil |
| | | } |
| | | |
| | | //移除已执行完的删除任务 |
| | | func DeleteDelTask(serverIp string, serverPort string, indexName string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query" |
| | | deleteJson := `{ |
| | | "script": { |
| | | "lang":"painless", |
| | | "inline": "ctx._source.instantTask.remove(0)" |
| | | }, |
| | | "query": { |
| | | "bool": { |
| | | "filter":[{ |
| | | "term":{ |
| | | "id":"` + analyServerId + `" |
| | | } |
| | | }] |
| | | } |
| | | } |
| | | }` |
| | | buf, err := EsReq("POST", url, []byte(deleteJson)) |
| | | if err != nil { |
| | | return false, errors.New("请求失败") |
| | | } |
| | | updateRes, err := SourceUpdated(buf) |
| | | if err != nil { |
| | | return false, errors.New("解码失败") |
| | | } |
| | | if updateRes == -1 { |
| | | result = false |
| | | } else { |
| | | result = true |
| | | } |
| | | return result, nil |
| | | } |
| | | |
| | | type ShardInfo struct { |
| | | ShardIndex string `json:"shardIndex"` //分片所属索引名称 |
| | | ShardNum int `json:"shardNum"` //分片号 |
| | | ShardRole string `json:"shardRole"` //分片角色(主分片:primary 副本分片:replica) |
| | | ShardState string `json:"shardState"` //分片状态(启用:STARTED 未启用:UNASSIGNED) |
| | | ShardDocs int `json:"shardDocs"` //分片已保存文档数 |
| | | ShardStore string `json:"shardStore"` //分片当前存储数据大小 |
| | | ShardIp string `json:"shardIp"` //分片所在节点ip |
| | | ShardNode string `json:"shardNode"` //分片所在节点名称 |
| | | } |
| | | |
| | | //获取索引分片信息 |
| | | func GetShardsByIndex(serverIp string, serverPort string, indexName string) ([]ShardInfo, error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/_cat/shards?v" |
| | | buf, err := EsReq("GET", url, []byte("")) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | var inf = []ShardInfo{} |
| | | res := strings.Split(string(buf), "\n")[1:] |
| | | for _, r := range res { |
| | | if r != "" { |
| | | |
| | | inx := strings.Fields(r) |
| | | index := inx[0] |
| | | shard, _ := strconv.Atoi(inx[1]) |
| | | prired := inx[2] |
| | | if prired == "r" { |
| | | prired = "replica" |
| | | } |
| | | if prired == "p" { |
| | | prired = "primary" |
| | | } |
| | | state := inx[3] |
| | | docs := 0 |
| | | store := "" |
| | | ip := "" |
| | | node := "" |
| | | if state == "STARTED" { |
| | | docs, _ = strconv.Atoi(inx[4]) |
| | | store = inx[5] |
| | | ip = inx[6] |
| | | node = inx[7] |
| | | } |
| | | if index == indexName { |
| | | inf = append(inf, ShardInfo{ |
| | | ShardIndex: index, |
| | | ShardNum: shard, |
| | | ShardRole: prired, |
| | | ShardState: state, |
| | | ShardDocs: docs, |
| | | ShardStore: store, |
| | | ShardIp: ip, |
| | | ShardNode: node, |
| | | }) |
| | | |
| | | } |
| | | } |
| | | |
| | | } |
| | | return inf, nil |
| | | } |