| | |
| | | |
| | | //使用es底层机制处理分页 |
| | | |
| | | analyServerFilterStr := "" |
| | | analyServerId := compareArgs.AnalyServerId |
| | | if analyServerId == "" { |
| | | fmt.Println("no analyServerId") |
| | | return |
| | | if analyServerId != "" { |
| | | analyServerFilterStr = "{\"term\":{\"analyServerId\":\"" + analyServerId + "\"}}," |
| | | } |
| | | analyServerFilterStr := "{\"term\":{\"analyServerId\":\"" + analyServerId + "\"}}," |
| | | |
| | | |
| | | //首次请求头 |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | |
| | | } |
| | | |
| | | //初始化实时抓拍 |
| | | func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool ) ([]protomsg.Videopersons, error){ |
| | | func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool ,quantity int) ([]protomsg.Videopersons, error){ |
| | | var videopersonsInfo []protomsg.Videopersons |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | | "/" + indexName + "/_search" |
| | |
| | | },` |
| | | } |
| | | DSLJson := `{ |
| | | "size":20, |
| | | "size":`+strconv.Itoa(quantity)+`, |
| | | `+queryStr+` |
| | | "sort":[{"picDate":{"order":"desc"}}], |
| | | "_source": ["baseInfo", "alarmRules", "sex", "analyServerName", "sdkName", "ageDescription", "content", "id", "cameraAddr", "picMaxUrl", "picDate", "race", "videoUrl", "picSmUrl", "taskName", "personIsHub", "isAlarm", "analyServerIp", "cameraId"] |
| | | }` |
| | | |
| | | buf, err := EsReq("POST", url, []byte(DSLJson)) |
| | | if err != nil { |
| | | return videopersonsInfo, err |
| | |
| | | } |
| | | |
| | | //综合统计 |
| | | func StatisticsComprehensive(serverIp string, serverPort string, indexName string) (total int, err error){ |
| | | func StatisticsComprehensive(serverIp string, serverPort string, indexName string, isAlarm bool) (total int, err error){ |
| | | isAlarmStr := "" |
| | | if isAlarm == true { |
| | | isAlarmStr = `,{"term":{"isAlarm":1}}` |
| | | } |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | | "/" + indexName + "/_search" |
| | | DSLJson := `{ |
| | |
| | | "filter":[{ |
| | | "range":{ |
| | | "picDate":{ |
| | | "gte":"now+8H/d" |
| | | "gte":"now+8h/d" |
| | | } |
| | | } |
| | | }] |
| | | } |
| | | `+isAlarmStr+` |
| | | ] |
| | | } |
| | | } |
| | | }` |
| | | //fmt.Println(DSLJson) |
| | | buf, err := EsReq("POST",url,[]byte(DSLJson)) |
| | | if err != nil { |
| | | return total, err |
| | |
| | | //fmt.Println(total) |
| | | return total,nil |
| | | } |
| | | |
| | | //实时报警任务比率 |
| | | func RealTimeAlarmTaskRate(serverIp string, serverPort string,indexName string) (sources map[string]int,err error){ |
| | | func RealTimeAlarmTaskRate(serverIp string, serverPort string, indexName string) (sources []map[string]interface{},err error){ |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | | "/"+indexName+"/_search" |
| | | DSLJson := `{ |
| | |
| | | } |
| | | }` |
| | | buf, err := EsReq("POST",url,[]byte(DSLJson)) |
| | | if err != nil { |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | var info interface{} |
| | |
| | | if !ok { |
| | | return nil, errors.New("first hits change error!") |
| | | } |
| | | var source = make(map[string]int,0) |
| | | |
| | | for _, in := range sdkName_status["buckets"].([]interface{}){ |
| | | var source = make(map[string]interface{},0) |
| | | tmpbuf, ok := in.(map[string]interface{}) |
| | | if !ok { |
| | | fmt.Println("change to source error!") |
| | |
| | | } |
| | | sdkName := tmpbuf["key"].(string) |
| | | count := int(tmpbuf["doc_count"].(float64)) |
| | | source[sdkName] = count |
| | | //fmt.Println("in",in) |
| | | //sources[in["key"].(string)] = int(in["doc_count"].(float64)) |
| | | source["name"] = sdkName |
| | | source["value"] = count |
| | | sources = append(sources, source) |
| | | } |
| | | //fmt.Println("sources",source) |
| | | return source,nil |
| | | //fmt.Println("tmpSource",sources) |
| | | return sources,nil |
| | | } |
| | | |
| | | //聚合任务列表,taskId+taskName |
| | | func AggregateTaskList(serverIp string, serverPort string, indexName string) (sources []map[string]interface{},err error){ |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | | "/"+indexName+"/_search" |
| | | DSLJson := `{ |
| | | "size": 0, |
| | | "aggs": { |
| | | "task_status": { |
| | | "composite": { |
| | | "sources": [ |
| | | { |
| | | "taskId": { |
| | | "terms": { |
| | | "field": "taskId" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "taskName": { |
| | | "terms": { |
| | | "field": "taskName.raw" |
| | | } |
| | | } |
| | | } |
| | | ], |
| | | "size":"1000" |
| | | } |
| | | } |
| | | } |
| | | }` |
| | | buf, err := EsReq("POST",url,[]byte(DSLJson)) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | var info interface{} |
| | | json.Unmarshal(buf, &info) |
| | | out, ok := info.(map[string]interface{}) |
| | | if !ok { |
| | | return nil, errors.New("http response interface can not change map[string]interface{}") |
| | | } |
| | | middle, ok := out["aggregations"].(map[string]interface{}) |
| | | if !ok { |
| | | return nil, errors.New("first hits change error!") |
| | | } |
| | | task_status, ok := middle["task_status"].(map[string]interface{}) |
| | | if !ok { |
| | | return nil, errors.New("first hits change error!") |
| | | } |
| | | |
| | | for _, in := range task_status["buckets"].([]interface{}){ |
| | | var source = make(map[string]interface{},0) |
| | | tmpbuf, ok := in.(map[string]interface{}) |
| | | if !ok { |
| | | fmt.Println("change to source error!") |
| | | continue |
| | | } |
| | | task := tmpbuf["key"].(map[string]interface{}) |
| | | count := int(tmpbuf["doc_count"].(float64)) |
| | | taskName := task["taskName"].(string) |
| | | taskId := task["taskId"].(string) |
| | | source["taskName"] = taskName |
| | | source["taskId"] = taskId |
| | | source["count"] = count |
| | | sources = append(sources, source) |
| | | } |
| | | //fmt.Println("tmpSource",sources) |
| | | return sources,nil |
| | | |
| | | } |