From 705f76d542397154da2acf5461f2888828cbd5b8 Mon Sep 17 00:00:00 2001
From: putonghao <ynxwpth@163.com>
Date: 星期三, 14 九月 2022 14:38:55 +0800
Subject: [PATCH] update cluster name to elasticsearch.yml
---
EsApi.go | 188 ++++++++++++++++++++++++++++++++++++++--------
1 files changed, 153 insertions(+), 35 deletions(-)
diff --git a/EsApi.go b/EsApi.go
index 73ee338..f690f7d 100644
--- a/EsApi.go
+++ b/EsApi.go
@@ -257,20 +257,23 @@
if err != nil {
return nil, err
}
+ if len(source) == 0 {
+ return source, nil
+ }
faceSource := make([]map[string]interface{}, 0)
for index, info := range source {
if int(info["stayTime"].(float64)) > thresholdStayTime {
faceSource = append(faceSource, source[index])
}
}
+ mapsSort := MapsSort{}
+ mapsSort.Key = "endTime"
+ mapsSort.MapList = faceSource
+ sort.Sort(&mapsSort)
if len(faceSource) > total {
- mapsSort := MapsSort{}
- mapsSort.Key = "endTime"
- mapsSort.MapList = faceSource
- sort.Sort(&mapsSort)
return mapsSort.MapList[:total], nil
}
- return faceSource, nil
+ return mapsSort.MapList, nil
}
func GetFaceDataByTimeAndId(startTime string, endTime string, id string, thresholdTime int, thresholdStayTime int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) {
@@ -355,16 +358,23 @@
if err != nil {
return nil, err
}
+ if len(source) == 0 {
+ return source, nil
+ }
faceSource := make([]map[string]interface{}, 0)
for index, info := range source {
if int(info["stayTime"].(float64)) > thresholdStayTime {
faceSource = append(faceSource, source[index])
}
}
- return faceSource, nil
+ mapsSort := MapsSort{}
+ mapsSort.Key = "startTime"
+ mapsSort.MapList = faceSource
+ sort.Sort(&mapsSort)
+ return mapsSort.MapList, nil
}
-func GetFaceIdDeduplication(startTime string, endTime string, serverIp string, serverPort string, indexName string) (ids []string, err error) {
+func GetFaceIdDeduplication(startTime string, endTime string, serverIp string, serverPort string, indexName string) (ids []map[string]interface{}, err error) {
var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
var requestBody = `{
"query": {
@@ -400,7 +410,27 @@
}
],
"size": 10000000
- }
+ },
+ "aggs": {
+ "top_attention_hits": {
+ "top_hits": {
+ "size": 1,
+ "sort": [
+ {
+ "picDate": {
+ "order": "desc"
+ }
+ }
+ ],
+ "_source": {
+ "includes": [
+ "picDate"
+ ]
+ }
+ }
+ }
+ }
+ }
}
}
}`
@@ -413,6 +443,13 @@
ids, err1 := SourceDeduplication(buf)
if err1 != nil {
return nil, err1
+ }
+ if len(ids) > 1 {
+ mapsSort := MapsSort{}
+ mapsSort.Key = "lastTime"
+ mapsSort.MapList = ids
+ sort.Sort(&mapsSort)
+ return mapsSort.MapList, nil
}
return ids, nil
}
@@ -851,18 +888,25 @@
return statu, errors.New("http response interface can not change map[string]interface{}")
}
middle, ok := out["updated"].(float64)
- if !ok {
+ batches,ok1 := out["batches"].(float64)
+ if !ok || !ok1{
logPrint("first updated change error!")
statu = 500
return statu, errors.New("first updated change error!")
}
- if middle == 1 {
- statu = 200
- return statu, nil
- }
- if middle == 0 {
- statu = 201
- return statu, errors.New("宸茬粡淇敼")
+ if batches == 0 {
+ logPrint("no such doc in database")
+ statu = 400
+ return statu,errors.New("鐩爣鏁版嵁涓嶅瓨鍦�")
+ } else {
+ if middle == 1 {
+ statu = 200
+ return statu, nil
+ }
+ if middle == 0 {
+ statu = 201
+ return statu, errors.New("宸茬粡淇敼")
+ }
}
return statu, nil
}
@@ -907,7 +951,12 @@
isCollectStr := ""
isCollect := compareArgs.Collection
if isCollect != "" {
- isCollectStr = "{\"term\":{\"isCollect\":\"" + isCollect + "\"}},"
+ //isCollectStr = "{\"term\":{\"isCollect\":\"" + isCollect + "\"}},"
+ if isCollect == "1" {
+ isCollectStr = "{\"term\":{\"isCollect\":true}},"
+ } else if isCollect == "0" {
+ isCollectStr = "{\"term\":{\"isCollect\":false}},"
+ }
}
//鍒ゆ柇甯冮槻绛夌骇
@@ -1048,7 +1097,7 @@
if category != "all" {
filterArr = append(filterArr, ` {
"term":{
- "targetInfo.targetType":"`+category+`"
+ "targetInfo.targetType.raw":"`+category+`"
}
}`)
@@ -1068,7 +1117,7 @@
"sort":[{"picDate":{"order":"desc"}}],
"_source": {"includes":[],"excludes":["*.feature"]}
}`
- logPrint(DSLJson)
+ //logPrint(DSLJson)
buf, err := EsReq("POST", url, []byte(DSLJson))
if err != nil {
return aIOceanInfo, err
@@ -1158,7 +1207,6 @@
}
}
}`
- //logPrint(DSLJson)
buf, err := EsReq("POST", url, []byte(DSLJson))
if err != nil {
return total, err
@@ -1196,9 +1244,9 @@
}
},
"aggs":{
- "sdkName_status":{
+ "taskName_status":{
"terms":{
- "field":"sdkName.raw"
+ "field":"taskName.raw"
}
}
}
@@ -1217,11 +1265,11 @@
if !ok {
return nil, errors.New("first hits change error!")
}
- sdkName_status, ok := middle["sdkName_status"].(map[string]interface{})
+ sdkName_status, ok := middle["taskName_status"].(map[string]interface{})
if !ok {
return nil, errors.New("first hits change error!")
}
-
+ //fmt.Println(sdkName_status)
for _, in := range sdkName_status["buckets"].([]interface{}) {
var source = make(map[string]interface{}, 0)
tmpbuf, ok := in.(map[string]interface{})
@@ -1240,10 +1288,19 @@
}
//鑱氬悎浠诲姟鍒楄〃锛宼askId+taskName
-func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string) (sources []map[string]interface{}, err error) {
+func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string, cameraIds []string) (sources []map[string]interface{}, err error) {
url := "http://" + serverIp + ":" + serverPort +
"/" + indexName + "/_search"
serverFilterStr := ""
+ cameIdFilterStr := ""
+ if cameraIds != nil && len(cameraIds) > 0 {
+ cameIdsStr := strings.Replace(strings.Trim(fmt.Sprint(cameraIds), "[]"), " ", "\",\"", -1)
+ cameIdFilterStr = `,{
+ "term": {
+ "cameraId": "` + cameIdsStr + `"
+ }
+ }`
+ }
if analyServerId != "" {
serverFilterStr = `,
"query": {
@@ -1252,8 +1309,9 @@
{
"term": {
"analyServerId": "` + analyServerId + `"
+ }
}
- }
+ ` + cameIdFilterStr + `
]
}
}`
@@ -1585,7 +1643,7 @@
}
//鎸夋棩鏈熻寖鍥达紝鏈嶅姟鍣↖d鍒犻櫎鏁版嵁
-func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
+func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (total int, err error,) {
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_delete_by_query"
deleteJson := `{
"query":{
@@ -1607,20 +1665,17 @@
}
}
} `
+ fmt.Println(url)
+ fmt.Println(deleteJson)
buf, err := EsReq("POST", url, []byte(deleteJson))
if err != nil {
- return false, errors.New("璇锋眰澶辫触")
+ return -1, errors.New("璇锋眰澶辫触")
}
deleteRes, err := SourceDeleted(buf)
if err != nil {
- return false, errors.New("瑙g爜澶辫触")
+ return -1, errors.New("瑙g爜澶辫触")
}
- if deleteRes == -1 {
- result = false
- } else {
- result = true
- }
- return result, nil
+ return deleteRes,nil
}
//缁欐墍鏈夎妭鐐硅拷鍔犲垹闄や换鍔′俊鎭�
@@ -1699,3 +1754,66 @@
}
return result, nil
}
+
+type ShardInfo struct {
+ ShardIndex string `json:"shardIndex"` //鍒嗙墖鎵�灞炵储寮曞悕绉�
+ ShardNum int `json:"shardNum"` //鍒嗙墖鍙�
+ ShardRole string `json:"shardRole"` //鍒嗙墖瑙掕壊(涓诲垎鐗囷細primary 鍓湰鍒嗙墖锛歳eplica)
+ ShardState string `json:"shardState"` //鍒嗙墖鐘舵��(鍚敤锛歋TARTED 鏈惎鐢細UNASSIGNED)
+ ShardDocs int `json:"shardDocs"` //鍒嗙墖宸蹭繚瀛樻枃妗f暟
+ ShardStore string `json:"shardStore"` //鍒嗙墖褰撳墠瀛樺偍鏁版嵁澶у皬
+ ShardIp string `json:"shardIp"` //鍒嗙墖鎵�鍦ㄨ妭鐐筰p
+ ShardNode string `json:"shardNode"` //鍒嗙墖鎵�鍦ㄨ妭鐐瑰悕绉�
+}
+
+//鑾峰彇绱㈠紩鍒嗙墖淇℃伅
+func GetShardsByIndex(serverIp string, serverPort string, indexName string) ([]ShardInfo, error) {
+ url := "http://" + serverIp + ":" + serverPort + "/_cat/shards?v"
+ buf, err := EsReq("GET", url, []byte(""))
+ if err != nil {
+ return nil, err
+ }
+ var inf = []ShardInfo{}
+ res := strings.Split(string(buf), "\n")[1:]
+ for _, r := range res {
+ if r != "" {
+
+ inx := strings.Fields(r)
+ index := inx[0]
+ shard, _ := strconv.Atoi(inx[1])
+ prired := inx[2]
+ if prired == "r" {
+ prired = "replica"
+ }
+ if prired == "p" {
+ prired = "primary"
+ }
+ state := inx[3]
+ docs := 0
+ store := ""
+ ip := ""
+ node := ""
+ if state == "STARTED" {
+ docs, _ = strconv.Atoi(inx[4])
+ store = inx[5]
+ ip = inx[6]
+ node = inx[7]
+ }
+ if index == indexName {
+ inf = append(inf, ShardInfo{
+ ShardIndex: index,
+ ShardNum: shard,
+ ShardRole: prired,
+ ShardState: state,
+ ShardDocs: docs,
+ ShardStore: store,
+ ShardIp: ip,
+ ShardNode: node,
+ })
+
+ }
+ }
+
+ }
+ return inf, nil
+}
--
Gitblit v1.8.0