From ebfa610f8c66fd2827a2eec619bfb3e0e22c332f Mon Sep 17 00:00:00 2001
From: sunty <1172534965@qq.com>
Date: 星期一, 25 三月 2024 17:13:14 +0800
Subject: [PATCH] 昼伏夜出调试
---
EsClient.go | 64 +++++++++---
EsApi.go | 181 +++++++++++++++++++++++++++++++----
2 files changed, 203 insertions(+), 42 deletions(-)
diff --git a/EsApi.go b/EsApi.go
index bd1a3f1..eb96adc 100644
--- a/EsApi.go
+++ b/EsApi.go
@@ -22,6 +22,141 @@
}
}
+//***********************閲嶅簡Start**********************************//
+
+type activeHourFormat struct {
+ startTime string
+ endTime string
+ startHour int
+ endHour int
+}
+
+func formatActiveHour(activeHour string) (activeHourFormat, error) {
+ hours := strings.Split(activeHour, "-")
+
+ if len(hours) == 2 {
+ startHour := hours[0]
+ endHour := hours[1]
+
+ // 瑙f瀽寮�濮嬫椂闂寸殑灏忔椂鍜屽垎閽�
+ startParts := strings.Split(startHour, ":")
+ startHourInt, _ := strconv.Atoi(startParts[0])
+
+ // 瑙f瀽缁撴潫鏃堕棿鐨勫皬鏃跺拰鍒嗛挓
+ endParts := strings.Split(endHour, ":")
+ endHourInt, _ := strconv.Atoi(endParts[0])
+
+ // 杈撳嚭寮�濮嬫椂闂寸殑灏忔椂
+ fmt.Println("寮�濮嬫椂闂寸殑灏忔椂:", startHourInt)
+
+ // 杈撳嚭缁撴潫鏃堕棿鐨勫皬鏃� + 1
+ endHourPlusOne := (endHourInt + 1) % 24 // 鍙栦綑纭繚涓嶈秴杩�24灏忔椂
+ fmt.Println("缁撴潫鏃堕棿鐨勫皬鏃� + 1:", endHourPlusOne)
+ activeHourFormat := activeHourFormat{startTime: startHour, endTime: endHour, startHour: startHourInt, endHour: endHourPlusOne}
+ return activeHourFormat, nil
+ }
+ return activeHourFormat{}, errors.New("閿欒锛氭棤娉曡В鏋愬紑濮嬫椂闂村拰缁撴潫鏃堕棿")
+
+}
+
+func DayNightActivityQuery(communityId string, startTime string, endTime string, activeHour string, indexName string, serverIp string, serverPort string) ([]string, error) {
+ activityId := make([]string, 0)
+ esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
+
+ activeHourFormat, err := formatActiveHour(activeHour)
+ if err != nil {
+ return nil, err
+ }
+
+ queryDSL := `
+ {
+ "size": 0,
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "range": {
+ "picDate": {
+ "gte": "` + startTime + `",
+ "lt": "` + endTime + `"
+ }
+ }
+ },
+ {
+ "term": {
+ "communityId": "` + communityId + `"
+ }
+ },
+ {
+ "script": {
+ "script": {
+ "source": "doc['picDate'].value.hourOfDay >= ` + strconv.Itoa(activeHourFormat.startHour) + ` || doc['picDate'].value.hourOfDay < ` + strconv.Itoa(activeHourFormat.endHour) + `",
+ "lang": "painless"
+ }
+ }
+ }
+ ],
+ "must_not": [
+ {
+ "term": {
+ "documentNumber": ""
+ }
+ }
+ ]
+ }
+ },
+ "aggs": {
+ "group_by_documentnumber": {
+ "terms": {
+ "field": "documentNumber",
+ "size": 100000
+ },
+ "aggs": {
+ "group_by_date": {
+ "date_histogram": {
+ "field": "picDate",
+ "interval": "1d", // 鎸夊ぉ鍒嗘《
+ "format": "yyyy-MM-dd"
+ },
+ "aggs": {
+ "top_hits": {
+ "top_hits": {
+ "_source": [
+ "picDate"
+ ],
+ "size": 100000,
+ "sort": [
+ {
+ "picDate": {
+ "order": "desc"
+ }
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }`
+ //fmt.Println(esURL)
+ //fmt.Println(queryDSL)
+ buf, err := EsReq("POST", esURL, []byte(queryDSL))
+ if err != nil {
+ return nil, err
+ }
+ source, err := SourceAggregationList(buf)
+ if err != nil {
+ return nil, err
+ }
+ result, _ := decodeDocumentInfos(source)
+ return result, nil
+
+ return activityId, nil
+}
+
+// ***********************閲嶅簡End************************************//
// 鏍规嵁鎶撴媿浜哄憳id鏌ヨ鎶撴媿浜哄憳淇℃伅
func AIOceaninfosbyid(id []string, indexName string, serverIp string, serverPort string) ([]protomsg.AIOcean, error) {
var aIOceanInfo []protomsg.AIOcean
@@ -92,7 +227,7 @@
return videoUrl, nil
}
-//鏍规嵁鎶撴媿搴撲汉鍛榠d鏌ヨ鐗瑰緛鍊�
+// 鏍规嵁鎶撴媿搴撲汉鍛榠d鏌ヨ鐗瑰緛鍊�
func GetVideoPersonFaceFeatureById(id string, indexName string, serverIp string, serverPort string) (string, error) {
var jsonDSL = `
{
@@ -122,7 +257,7 @@
return feature, nil
}
-//鏍规嵁鐩爣id鏌ヨ宸茶拷鍔犳潯鏁�
+// 鏍规嵁鐩爣id鏌ヨ宸茶拷鍔犳潯鏁�
func GetLinkTagInfoSize(id string, indexName string, serverIp string, serverPort string) (size int, err error) {
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
queryDSL := `{
@@ -148,7 +283,7 @@
return size, nil
}
-//鏍规嵁鐩爣id杩藉姞璺熻釜淇℃伅
+// 鏍规嵁鐩爣id杩藉姞璺熻釜淇℃伅
func AppendTargetInfo(id string, targetInfo string, indexName string, serverIp string, serverPort string, updateTime string) (string, error) {
if targetInfo == "" {
return "", errors.New("append data is nil")
@@ -490,7 +625,7 @@
return ids, nil
}
-//缁熻鍚勪釜鍖哄煙浜烘暟
+// 缁熻鍚勪釜鍖哄煙浜烘暟
func StatisticsEveryAreaPersonsNumber(startTime string, endTime string, serverIp string, serverPort string, indexName string) ([]map[string]interface{}, error) {
var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
var requestBody = `{
@@ -617,7 +752,7 @@
}
-//鏍规嵁鏃堕棿鑼冨洿锛屾憚鍍忔満鍒楄〃锛屽垎缁勮仛鍚堜汉鑴稿垪琛�,杩斿洖鍒嗙粍鏁版嵁
+// 鏍规嵁鏃堕棿鑼冨洿锛屾憚鍍忔満鍒楄〃锛屽垎缁勮仛鍚堜汉鑴稿垪琛�,杩斿洖鍒嗙粍鏁版嵁
func GetFaceDataBucketsByCameraIdAndTimeReturnByGrouped(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) {
var filterArr []string
if cameraId != nil && len(cameraId) > 0 {
@@ -723,7 +858,7 @@
return sources, nil
}
-//鏍规嵁鏃堕棿鑼冨洿锛屾憚鍍忔満鍒楄〃锛屽垎缁勮仛鍚堜汉鑴稿垪琛�
+// 鏍规嵁鏃堕棿鑼冨洿锛屾憚鍍忔満鍒楄〃锛屽垎缁勮仛鍚堜汉鑴稿垪琛�
func GetFaceDataBucketsByCameraIdAndTime(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) {
var filterArr []string
if cameraId != nil && len(cameraId) > 0 {
@@ -824,7 +959,7 @@
return sources, nil
}
-//鏍规嵁鎶撴媿浜哄憳id鏇存柊锛坧icurl锛夊浘鐗囧湴鍧�
+// 鏍规嵁鎶撴媿浜哄憳id鏇存柊锛坧icurl锛夊浘鐗囧湴鍧�
func UpdatePicUrlById(id string, picUrl string, indexName string, serverIp string, serverPort string) (err error) {
updateTime := time.Now().Format("2006-01-02 15:04:05")
tRes, err := AIOceaninfosbyid([]string{id}, indexName, serverIp, serverPort)
@@ -887,7 +1022,7 @@
return nil
}
-//鏍规嵁鎶撴媿浜哄憳id鏇存柊锛坴ideourl锛夋憚鍍忔満鍦板潃
+// 鏍规嵁鎶撴媿浜哄憳id鏇存柊锛坴ideourl锛夋憚鍍忔満鍦板潃
func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int, err error) {
var info interface{}
@@ -947,7 +1082,7 @@
return statu, nil
}
-//鑾峰彇褰撳墠鑺傜偣鎶撴媿搴撴墍鏈変汉鍛業D*缂撳瓨*
+// 鑾峰彇褰撳墠鑺傜偣鎶撴媿搴撴墍鏈変汉鍛業D*缂撳瓨*
func GetAllLocalVideopersonsId(compareArgs protomsg.CompareArgs, indexName string, serverIp string, serverPort string, alarmLevelTypes string) (capturetable []string) {
queryStr := ""
queryBody := compareArgs.InputValue
@@ -1116,7 +1251,7 @@
return capturetable
}
-//鍒濆鍖栧疄鏃舵姄鎷�
+// 鍒濆鍖栧疄鏃舵姄鎷�
func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm string, category string, quantity int) ([]protomsg.AIOcean, error) {
var aIOceanInfo []protomsg.AIOcean
url := "http://" + serverIp + ":" + serverPort +
@@ -1169,7 +1304,7 @@
return aIOcean, nil
}
-//瀹炴椂鎶撴媿
+// 瀹炴椂鎶撴媿
func RealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool) ([]protomsg.AIOcean, error) {
var aIOceanInfo []protomsg.AIOcean
url := "http://" + serverIp + ":" + serverPort +
@@ -1213,7 +1348,7 @@
return aIOcean, nil
}
-//缁煎悎缁熻
+// 缁煎悎缁熻
func StatisticsComprehensive(serverIp string, serverPort string, indexName string, isAlarm string) (total int, err error) {
url := "http://" + serverIp + ":" + serverPort +
"/" + indexName + "/_search"
@@ -1262,7 +1397,7 @@
return total, nil
}
-//瀹炴椂鎶ヨ浠诲姟姣旂巼
+// 瀹炴椂鎶ヨ浠诲姟姣旂巼
func RealTimeAlarmTaskRate(serverIp string, serverPort string, indexName string) (sources []map[string]interface{}, err error) {
url := "http://" + serverIp + ":" + serverPort +
"/" + indexName + "/_search"
@@ -1323,7 +1458,7 @@
return sources, nil
}
-//鑱氬悎浠诲姟鍒楄〃锛宼askId+taskName
+// 鑱氬悎浠诲姟鍒楄〃锛宼askId+taskName
func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string, cameraIds []string) (sources []map[string]interface{}, err error) {
url := "http://" + serverIp + ":" + serverPort +
"/" + indexName + "/_search"
@@ -1419,7 +1554,7 @@
}
-//娣诲姞鍗冲皢鍒犻櫎淇″彿
+// 娣诲姞鍗冲皢鍒犻櫎淇″彿
func AddDeleteSignal() {
}
@@ -1460,7 +1595,7 @@
}
-//鏌ヨ鏃堕棿娈垫暟鎹� *缂撳瓨*
+// 鏌ヨ鏃堕棿娈垫暟鎹� *缂撳瓨*
func GetPeriodInfos(serverIp string, serverPort string, startTime string, endTime string, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) {
var capdbinfo []*protomsg.MultiFeaCache
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
@@ -1638,8 +1773,8 @@
return dbinfos, nil
}
-//************************CORN TASK*******************************
-//鏌ヨ鏃ユ湡鑼冨洿鍐呮槸鍚﹁繕瀛樺湪鏁版嵁
+// ************************CORN TASK*******************************
+// 鏌ヨ鏃ユ湡鑼冨洿鍐呮槸鍚﹁繕瀛樺湪鏁版嵁
func QueryAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
deleteJson := `{
@@ -1678,8 +1813,8 @@
return result, nil
}
-//鎸夋棩鏈熻寖鍥达紝鏈嶅姟鍣↖d鍒犻櫎鏁版嵁
-func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (total int, err error, ) {
+// 鎸夋棩鏈熻寖鍥达紝鏈嶅姟鍣↖d鍒犻櫎鏁版嵁
+func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (total int, err error) {
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_delete_by_query"
deleteJson := `{
"query":{
@@ -1714,7 +1849,7 @@
return deleteRes, nil
}
-//缁欐墍鏈夎妭鐐硅拷鍔犲垹闄や换鍔′俊鎭�
+// 缁欐墍鏈夎妭鐐硅拷鍔犲垹闄や换鍔′俊鎭�
func AddDelTask(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
addJson := `{
@@ -1757,7 +1892,7 @@
return result, nil
}
-//绉婚櫎宸叉墽琛屽畬鐨勫垹闄や换鍔�
+// 绉婚櫎宸叉墽琛屽畬鐨勫垹闄や换鍔�
func DeleteDelTask(serverIp string, serverPort string, indexName string, analyServerId string) (result bool, err error) {
url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
deleteJson := `{
@@ -1802,7 +1937,7 @@
ShardNode string `json:"shardNode"` //鍒嗙墖鎵�鍦ㄨ妭鐐瑰悕绉�
}
-//鑾峰彇绱㈠紩鍒嗙墖淇℃伅
+// 鑾峰彇绱㈠紩鍒嗙墖淇℃伅
func GetShardsByIndex(serverIp string, serverPort string, indexName string) ([]ShardInfo, error) {
url := "http://" + serverIp + ":" + serverPort + "/_cat/shards?v"
buf, err := EsReq("GET", url, []byte(""))
diff --git a/EsClient.go b/EsClient.go
index 0395fd3..5da48ae 100644
--- a/EsClient.go
+++ b/EsClient.go
@@ -15,7 +15,6 @@
"time"
)
-
func Parsesources(sources []map[string]interface{}) (multiInfos []*protomsg.MultiFeaCache) {
var ok bool
for _, source := range sources {
@@ -104,7 +103,7 @@
return
}
-//瑙f瀽鎶撴媿搴撲汉鍛樼粨鏋�
+// 瑙f瀽鎶撴媿搴撲汉鍛樼粨鏋�
func AIOceanAnalysis(sources []map[string]interface{}) (tmpinfos []protomsg.AIOcean) {
var ok bool
for _, source := range sources {
@@ -311,7 +310,7 @@
return tmpinfos
}
-//瑙f瀽搴曞簱浜哄憳缁撴瀯
+// 瑙f瀽搴曞簱浜哄憳缁撴瀯
func Dbpersonbyid(sources []map[string]interface{}) (tmpinfos []protomsg.Dbperson) {
var ok bool
@@ -379,7 +378,7 @@
return tmpinfos
}
-//瑙f瀽搴曞簱缁撴瀯
+// 瑙f瀽搴曞簱缁撴瀯
func Dbtablebyid(sources []map[string]interface{}) (tmpinfos []protomsg.Dbtable) {
var ok bool
@@ -548,7 +547,7 @@
return allSource, nil
}
-func SourceDeduplication(buf [] byte) ([]map[string]interface{}, error) {
+func SourceDeduplication(buf []byte) ([]map[string]interface{}, error) {
var info interface{}
json.Unmarshal(buf, &info)
out, ok := info.(map[string]interface{})
@@ -577,8 +576,8 @@
return faceId, nil
}
-//瑙f瀽鑱氬悎璁℃暟缁撴瀯
-func SourceStatistics(buf [] byte) ([]map[string]interface{}, error) {
+// 瑙f瀽鑱氬悎璁℃暟缁撴瀯
+func SourceStatistics(buf []byte) ([]map[string]interface{}, error) {
var info interface{}
json.Unmarshal(buf, &info)
out, ok := info.(map[string]interface{})
@@ -604,7 +603,7 @@
return resultData, nil
}
-func SourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) {
+func SourceAggregations(buf []byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) {
s := make(map[string]interface{})
loc, err := time.LoadLocation("Asia/Shanghai")
if err != nil {
@@ -739,7 +738,7 @@
return s, nil
}
-func SourceAggregationsReturnByGrouped(buf [] byte, thresholdTime float64) (sources []map[string]interface{}, err error) {
+func SourceAggregationsReturnByGrouped(buf []byte, thresholdTime float64) (sources []map[string]interface{}, err error) {
loc, err := time.LoadLocation("Asia/Shanghai")
if err != nil {
return nil, errors.New("鏃跺尯璁剧疆閿欒")
@@ -868,7 +867,7 @@
return sources, nil
}
-//瑙f瀽鎶撴媿搴撲汉鍛樼粨鏋�
+// 瑙f瀽鎶撴媿搴撲汉鍛樼粨鏋�
func PerSonAnalysis(preData []map[string]interface{}) (sources []map[string]interface{}, err error) {
loc, err := time.LoadLocation("Asia/Shanghai")
if err != nil {
@@ -1034,15 +1033,43 @@
return -1, errors.New("first total change error!")
}
- tmp,b := middle["total"].(map[string]interface{})
+ tmp, b := middle["total"].(map[string]interface{})
if b != true {
v := middle["total"].(float64)
t := int(v)
- return t,nil
+ return t, nil
}
value := tmp["value"].(float64)
total = int(value)
return total, nil
+}
+
+func SourceAggregationList(buf []byte) (sources []map[string]interface{}, err error) {
+ var info interface{}
+ json.Unmarshal(buf, &info)
+ out, ok := info.(map[string]interface{})
+ if !ok {
+ return nil, errors.New("http response interface can not change map[string]interface{}")
+ }
+
+ middle, ok := out["aggregations"].(map[string]interface{})
+ if !ok {
+ return nil, errors.New("first hits change error!")
+ }
+
+ documentAggregations := middle["group_by_documentnumber"].(map[string]interface{})
+ buckets := documentAggregations["buckets"].([]interface{})
+ if len(buckets) == 0 {
+ return nil, nil
+ }
+ for _, in := range buckets {
+ tmpbuf, ok := in.(map[string]interface{})
+ if !ok {
+ return nil, errors.New("")
+ }
+ sources = append(sources, tmpbuf)
+ }
+ return sources, nil
}
func EsReq(method string, url string, parama []byte) (buf []byte, err error) {
@@ -1053,7 +1080,7 @@
}
request, err := http.NewRequest(method, url, bytes.NewBuffer(parama))
request.Header.Set("Content-type", "application/json")
- request.Header.Set("Authorization",Token)
+ request.Header.Set("Authorization", Token)
if err != nil {
fmt.Println("build request fail !")
@@ -1086,15 +1113,14 @@
// 璧嬪�兼椂妫�娴嬫槸鍚﹁兘澶熻祴鍊�
//func //Isnil(key string, ok bool){
// if !ok {
-// fmt.Println(key, "is nil can not asign")
+// fmt.Println(key, "is nil can not asign")
// }
//}
type account struct {
- Username string `mapstructure: "username"`
+ Username string `mapstructure: "username"`
Userpassword string `mapstructure: "userpassword"`
}
-
var Account = &account{}
@@ -1109,9 +1135,9 @@
v.AddConfigPath("/opt/vasystem/config/")
err := v.ReadInConfig()
if err != nil {
- log.Fatal("err on parsing configuration file!",err)
+ log.Fatal("err on parsing configuration file!", err)
}
- v.UnmarshalKey("es.account",Account)
+ v.UnmarshalKey("es.account", Account)
- Token = "Basic "+base64.StdEncoding.EncodeToString([]byte(Account.Username+":"+Account.Userpassword))
+ Token = "Basic " + base64.StdEncoding.EncodeToString([]byte(Account.Username+":"+Account.Userpassword))
}
--
Gitblit v1.8.0