From f6ca7bb43270474fa876ff6ba62c6b2113b045ad Mon Sep 17 00:00:00 2001 From: sunty <1172534965@qq.com> Date: 星期五, 31 五月 2024 14:03:29 +0800 Subject: [PATCH] Optimize day and night algorithm, format date and time, batch process by date. --- EsClient.go | 152 ++++++++++++++++++++++++++++---------------------- 1 files changed, 85 insertions(+), 67 deletions(-) diff --git a/EsClient.go b/EsClient.go index a35533c..c5b47f1 100644 --- a/EsClient.go +++ b/EsClient.go @@ -3,60 +3,17 @@ import ( "basic.com/pubsub/protomsg.git" "bytes" + "encoding/base64" "encoding/json" "errors" "fmt" - "io" + "github.com/spf13/viper" "io/ioutil" + "log" "math" "net/http" - "strings" "time" ) - -func GetEsDataReq(url string, parama string, isSource bool) map[string]interface{} { - //fmt.Println("es 鏌ヨ璇锋眰璺緞" + url) // 閰嶇疆淇℃伅 鑾峰彇 - var dat map[string]interface{} - req, err := http.NewRequest("POST", url, strings.NewReader(parama)) - req.Header.Add("Content-Type", "application/json") - timeout := time.Duration(10 * time.Second) //瓒呮椂鏃堕棿50ms - client := &http.Client{Timeout: timeout} - resp, err := client.Do(req) - if err != nil { - fmt.Println(err) - return dat - } - defer resp.Body.Close() - dec := json.NewDecoder(resp.Body) - if err := dec.Decode(&dat); err == io.EOF { - fmt.Println(err.Error()) - return dat - } else if err != nil { - fmt.Println(err.Error()) - return dat - } - // 鏄惁闇�瑕� 瑙f瀽 es 杩斿洖鐨� source - if isSource { - dat = dat["hits"].(map[string]interface{}) - var data = make(map[string]interface{}, 2) - data["total"] = dat["total"] - sources := []interface{}{} - for _, value := range dat["hits"].([]interface{}) { - source := value.(map[string]interface{})["_source"].(map[string]interface{}) - //source["id"] = source["id"] - /*sdkType := source["sdkType"] - if sdkType != nil { - sdk, _ := strconv.Atoi(sdkType.(string)) - source["sdkType"] = sdkTypeToValue(sdk) - }*/ - sources = append(sources, source) - } - data["datalist"] = sources - return data - } else { - return dat - } -} func Parsesources(sources []map[string]interface{}) (multiInfos []*protomsg.MultiFeaCache) { var ok bool @@ -146,7 +103,7 @@ return } -//瑙f瀽鎶撴媿搴撲汉鍛樼粨鏋� +// 瑙f瀽鎶撴媿搴撲汉鍛樼粨鏋� func AIOceanAnalysis(sources []map[string]interface{}) (tmpinfos []protomsg.AIOcean) { var ok bool for _, source := range sources { @@ -353,7 +310,7 @@ return tmpinfos } -//瑙f瀽搴曞簱浜哄憳缁撴瀯 +// 瑙f瀽搴曞簱浜哄憳缁撴瀯 func Dbpersonbyid(sources []map[string]interface{}) (tmpinfos []protomsg.Dbperson) { var ok bool @@ -421,7 +378,7 @@ return tmpinfos } -//瑙f瀽搴曞簱缁撴瀯 +// 瑙f瀽搴曞簱缁撴瀯 func Dbtablebyid(sources []map[string]interface{}) (tmpinfos []protomsg.Dbtable) { var ok bool @@ -556,14 +513,14 @@ realStartTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitStartTime, loc) realEndTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitEndTime, loc) stayTime = math.Abs(realEndTime.Sub(realStartTime).Seconds()) - if sinTime.Sub(mTime).Seconds() == 0{ + if sinTime.Sub(mTime).Seconds() == 0 { sinTime.Add(time.Second * 1) sinTime.Format("2006-01-02 15:04:05") hitsSources[len(hitsSources)-1]["endTime"] = tmpTime stayTime = 1 - } else if stayTime == 0{ + } else if stayTime == 0 { stayTime = 1 - hitsSources[len(hitsSources)-1]["endTime"] = realEndTime.Add(time.Second*1).Format("2006-01-02 15:04:05") + hitsSources[len(hitsSources)-1]["endTime"] = realEndTime.Add(time.Second * 1).Format("2006-01-02 15:04:05") } hitsSources[len(hitsSources)-1]["stayTime"] = stayTime if point == indexLength { @@ -590,7 +547,7 @@ return allSource, nil } -func SourceDeduplication(buf [] byte) ([]map[string]interface{}, error) { +func SourceDeduplication(buf []byte) ([]map[string]interface{}, error) { var info interface{} json.Unmarshal(buf, &info) out, ok := info.(map[string]interface{}) @@ -619,8 +576,8 @@ return faceId, nil } -//瑙f瀽鑱氬悎璁℃暟缁撴瀯 -func SourceStatistics(buf [] byte) ([]map[string]interface{}, error) { +// 瑙f瀽鑱氬悎璁℃暟缁撴瀯 +func SourceStatistics(buf []byte) ([]map[string]interface{}, error) { var info interface{} json.Unmarshal(buf, &info) out, ok := info.(map[string]interface{}) @@ -646,7 +603,7 @@ return resultData, nil } -func SourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) { +func SourceAggregations(buf []byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) { s := make(map[string]interface{}) loc, err := time.LoadLocation("Asia/Shanghai") if err != nil { @@ -730,14 +687,14 @@ realStartTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitStartTime, loc) realEndTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitEndTime, loc) stayTime = math.Abs(realEndTime.Sub(realStartTime).Seconds()) - if sinTime.Sub(mTime).Seconds() == 0{ + if sinTime.Sub(mTime).Seconds() == 0 { sinTime.Add(time.Second * 1) sinTime.Format("2006-01-02 15:04:05") hitsSources[len(hitsSources)-1]["endTime"] = tmpTime stayTime = 1 - } else if stayTime == 0{ + } else if stayTime == 0 { stayTime = 1 - hitsSources[len(hitsSources)-1]["endTime"] = realEndTime.Add(time.Second*1).Format("2006-01-02 15:04:05") + hitsSources[len(hitsSources)-1]["endTime"] = realEndTime.Add(time.Second * 1).Format("2006-01-02 15:04:05") } hitsSources[len(hitsSources)-1]["stayTime"] = stayTime if point == indexLength { @@ -781,7 +738,7 @@ return s, nil } -func SourceAggregationsReturnByGrouped(buf [] byte, thresholdTime float64) (sources []map[string]interface{}, err error) { +func SourceAggregationsReturnByGrouped(buf []byte, thresholdTime float64) (sources []map[string]interface{}, err error) { loc, err := time.LoadLocation("Asia/Shanghai") if err != nil { return nil, errors.New("鏃跺尯璁剧疆閿欒") @@ -863,14 +820,14 @@ realStartTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitStartTime, loc) realEndTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitEndTime, loc) stayTime = math.Abs(realEndTime.Sub(realStartTime).Seconds()) - if sinTime.Sub(mTime).Seconds() == 0{ + if sinTime.Sub(mTime).Seconds() == 0 { sinTime.Add(time.Second * 1) sinTime.Format("2006-01-02 15:04:05") hitsSources[len(hitsSources)-1]["endTime"] = tmpTime stayTime = 1 - } else if stayTime == 0{ + } else if stayTime == 0 { stayTime = 1 - hitsSources[len(hitsSources)-1]["endTime"] = realEndTime.Add(time.Second*1).Format("2006-01-02 15:04:05") + hitsSources[len(hitsSources)-1]["endTime"] = realEndTime.Add(time.Second * 1).Format("2006-01-02 15:04:05") } hitsSources[len(hitsSources)-1]["stayTime"] = stayTime if point == indexLength { @@ -910,7 +867,7 @@ return sources, nil } -//瑙f瀽鎶撴媿搴撲汉鍛樼粨鏋� +// 瑙f瀽鎶撴媿搴撲汉鍛樼粨鏋� func PerSonAnalysis(preData []map[string]interface{}) (sources []map[string]interface{}, err error) { loc, err := time.LoadLocation("Asia/Shanghai") if err != nil { @@ -1075,9 +1032,44 @@ if !ok { return -1, errors.New("first total change error!") } - tmp := middle["total"].(float64) - total = int(tmp) + + tmp, b := middle["total"].(map[string]interface{}) + if b != true { + v := middle["total"].(float64) + t := int(v) + return t, nil + } + value := tmp["value"].(float64) + total = int(value) return total, nil +} + +func SourceAggregationList(buf []byte) (sources []map[string]interface{}, err error) { + var info interface{} + json.Unmarshal(buf, &info) + out, ok := info.(map[string]interface{}) + if !ok { + return nil, errors.New("http response interface can not change map[string]interface{}") + } + + middle, ok := out["aggregations"].(map[string]interface{}) + if !ok { + return nil, errors.New("first hits change error!") + } + //fmt.Println("鏈鍏卞尮閰嶆潯鏁颁负: ", out["hits"].(map[string]interface{})["total"].(float64)) + documentAggregations := middle["group_by_documentnumber"].(map[string]interface{}) + buckets := documentAggregations["buckets"].([]interface{}) + if len(buckets) == 0 { + return nil, nil + } + for _, in := range buckets { + tmpbuf, ok := in.(map[string]interface{}) + if !ok { + return nil, errors.New("") + } + sources = append(sources, tmpbuf) + } + return sources, nil } func EsReq(method string, url string, parama []byte) (buf []byte, err error) { @@ -1088,6 +1080,7 @@ } request, err := http.NewRequest(method, url, bytes.NewBuffer(parama)) request.Header.Set("Content-type", "application/json") + request.Header.Set("Authorization", Token) if err != nil { fmt.Println("build request fail !") @@ -1120,6 +1113,31 @@ // 璧嬪�兼椂妫�娴嬫槸鍚﹁兘澶熻祴鍊� //func //Isnil(key string, ok bool){ // if !ok { -// fmt.Println(key, "is nil can not asign") +// fmt.Println(key, "is nil can not asign") // } //} + +type account struct { + Username string `mapstructure: "username"` + Userpassword string `mapstructure: "userpassword"` +} + +var Account = &account{} + +var Token string + +func init() { + v := viper.New() + v.SetConfigType("yaml") + v.SetConfigName("pro") + v.AddConfigPath("../config/") + v.AddConfigPath("./config/") + v.AddConfigPath("/opt/vasystem/config/") + err := v.ReadInConfig() + if err != nil { + log.Fatal("err on parsing configuration file!", err) + } + v.UnmarshalKey("es.account", Account) + + Token = "Basic " + base64.StdEncoding.EncodeToString([]byte(Account.Username+":"+Account.Userpassword)) +} -- Gitblit v1.8.0