sunty
2020-08-28 d0e6e8c6ef16afbc276fc13dece6239476f8d4e3
add out
2个文件已修改
262 ■■■■■ 已修改文件
EsApi.go 152 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
EsClient.go 110 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
EsApi.go
@@ -1,6 +1,7 @@
package esutil
import (
    "basic.com/pubsub/protomsg.git"
    "encoding/json"
    "errors"
    "fmt"
@@ -8,8 +9,6 @@
    "strings"
    "sync"
    "time"
    "basic.com/pubsub/protomsg.git"
)
var logPrint = func(i ...interface{}) {
@@ -159,11 +158,144 @@
}
/**************************************customer analysis util start**************************************/
//根据时间范围聚合所有区域人信息,返回固定条数
func GetFaceDataByTimeAnd(startTime string, total int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) {
    var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
    var requestBody = `{
    "query": {
        "bool": {
            "filter": [
                {
                    "range": {
                        "picDate": {
                            "gte": "` + startTime + `"
                        }
                    }
                },
                {
                    "term":{
                        "targetInfo.targetType.raw": "FaceDetect"
                    }
                }
            ]
        }
    },
    "size": 0,
    "aggs": {
        "buckets_aggs": {
            "composite": {
                "sources": [
                    {
                        "faceId": {
                            "terms": {
                                "field": "baseInfo.targetId"
                            }
                        }
                    },
                    {
                        "areaId": {
                            "terms": {
                                "field": "targetInfo.areaId"
                            }
                        }
                    }
                ],
                "size": 10000000
            },
            "aggs": {
                "top_attention_hits": {
                    "top_hits": {
                        "size": 1000000,
                        "sort": [
                            {
                                "picDate": {
                                    "order": "asc"
                                }
                            }
                        ],
                        "_source": {
                            "includes": [
                                "baseInfo.targetId",
                                "targetInfo.picSmUrl",
                                "picDate"
                            ]
                        }
                    }
                }
            }
        }
    }
}`
    buf, err := EsReq("POST", requestUrl, []byte(requestBody))
    if err != nil {
        return nil, err
    }
    source, err := Sourcelist(buf)
    if err != nil {
        return nil, err
    }
    fmt.Println(source)
    return resData, nil
}
func GetFaceIdDeduplication(startTime string, endTime string, serverIp string, serverPort string, indexName string) (ids []string, err error) {
    var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
    var requestBody = `{
    "query": {
        "bool": {
            "filter": [
                {
                    "range": {
                        "picDate": {
                            "gte": "` + startTime + `",
                            "lte": "` + endTime + `"
                        }
                    }
                },
                {
                    "term": {
                        "targetInfo.targetType.raw": "FaceDetect"
                    }
                }
            ]
        }
    },
    "size": 0,
    "aggs": {
        "buckets_aggs": {
            "composite": {
                "sources": [
                    {
                        "faceId": {
                            "terms": {
                                "field": "baseInfo.targetId"
                            }
                        }
                    }
                ],
                "size": 10000000
            }
        }
    }
}`
    buf, err := EsReq("POST", requestUrl, []byte(requestBody))
    if err != nil {
        return nil, err
    }
    fmt.Println(buf)
    //ids, err := SourceDeduplication(buf)
    return ids,nil
}
/**************************************customer analysis util end**************************************/
//根据摄像机列表和时间查询人员浏览轨迹
func GetPersonDataByCameraIdAndTime(cameraId []string, startTime string, endTime string, serverIp string, ServerPort string, indexName string) (map[string]interface{}, error) {
func GetPersonDataByCameraIdAndTime(cameraId []string, startTime string, endTime string, serverIp string, serverPort string, indexName string) (map[string]interface{}, error) {
    var filterArr []string
    if cameraId != nil && len(cameraId) > 0{
    if cameraId != nil && len(cameraId) > 0 {
        esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1)
        filterArr = append(filterArr, `{
                            "terms": {
@@ -186,7 +318,7 @@
                }`)
    queryStr := strings.Join(filterArr, ",")
    personUrl := "http://" + serverIp + ":" + ServerPort + "/" + indexName + "/_search"
    personUrl := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
    personBody := `{
    "query": {
        "bool": {
@@ -236,7 +368,7 @@
//根据时间范围,摄像机列表,分组聚合人脸列表,返回分组数据
func GetFaceDataBucketsByCameraIdAndTimeReturnByGrouped(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) {
    var filterArr []string
    if cameraId != nil && len(cameraId) > 0{
    if cameraId != nil && len(cameraId) > 0 {
        esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1)
        filterArr = append(filterArr, `{
                            "terms": {
@@ -244,7 +376,7 @@
                        }
                }`)
    }
    if personId != nil &&len(personId) > 0{
    if personId != nil && len(personId) > 0 {
        esPersonId := strings.Replace(strings.Trim(fmt.Sprint(personId), "[]"), " ", "\",\"", -1)
        filterArr = append(filterArr, `{
            "terms": {
@@ -342,7 +474,7 @@
//根据时间范围,摄像机列表,分组聚合人脸列表
func GetFaceDataBucketsByCameraIdAndTime(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) {
    var filterArr []string
    if cameraId != nil && len(cameraId) > 0{
    if cameraId != nil && len(cameraId) > 0 {
        esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1)
        filterArr = append(filterArr, `{
                            "terms": {
@@ -350,7 +482,7 @@
                        }
                }`)
    }
    if personId != nil &&len(personId) > 0{
    if personId != nil && len(personId) > 0 {
        esPersonId := strings.Replace(strings.Trim(fmt.Sprint(personId), "[]"), " ", "\",\"", -1)
        filterArr = append(filterArr, `{
            "terms": {
@@ -492,7 +624,7 @@
    }
    middle, ok := out["updated"].(float64)
    if !ok {
        logPrint("first updated change error!")
        logPrint("first updated change error!", out)
        return errors.New("first updated change error!")
    }
    if middle == 1 {
EsClient.go
@@ -476,6 +476,112 @@
    return tmpinfos
}
func FaceSourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) {
    s := make(map[string]interface{})
    loc, err := time.LoadLocation("Asia/Shanghai")
    if err != nil {
        return nil, errors.New("时区设置错误")
    }
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return nil, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["aggregations"].(map[string]interface{})
    if !ok {
        return nil, errors.New("first hits change error!")
    }
    bucketsAggs := middle["buckets_aggs"].(map[string]interface{})
    buckets := bucketsAggs["buckets"].([]interface{})
    if len(buckets) == 0 {
        return nil, nil
    }
    allSource := make([]map[string]interface{}, 0)
    for _, inf := range buckets {
        hitsSources := make([]map[string]interface{}, 0)
        topAttentionHits := inf.(map[string]interface{})["top_attention_hits"].(map[string]interface{})
        middleHits := topAttentionHits["hits"].(map[string]interface{})
        finalHits := middleHits["hits"].([]interface{})
        startTime := ""
        indexLength := len(finalHits)
        point := 0
        for _, in := range finalHits {
            point = point+1
            tmpHitSource := make(map[string]interface{})
            tmpbuf, ok := in.(map[string]interface{})
            if !ok {
                fmt.Println("change to source error!")
                continue
            }
            source, ok := tmpbuf["_source"].(map[string]interface{})
            if !ok {
                fmt.Println("change _source error!")
                continue
            }
            baseInfo := source["baseInfo"].([]interface{})[0].(map[string]interface{})
            targetInfo := source["targetInfo"].([]interface{})[0].(map[string]interface{})
            tmpTime := source["picDate"].(string)
            mTime, err := time.ParseInLocation("2006-01-02 15:04:05", tmpTime, loc)
            if err != nil {
                return nil, errors.New("时间解析错误")
            }
            sTime := tmpTime
            eTime := mTime.Add(time.Second*1).Format("2006-01-02 15:04:05")
            stayTime := 1.0
            if startTime != "" && point < indexLength{
                sinTime, _ := time.ParseInLocation("2006-01-02 15:04:05", startTime, loc)
                passTime := math.Abs(mTime.Sub(sinTime).Seconds())
                hitsSources[len(hitsSources)-1]["stayTime"] = stayTime
                //fmt.Println("passTime:   ", passTime)
                if passTime <= thresholdTime || point == indexLength{
                    startTime = tmpTime
                    hitsSources[len(hitsSources)-1]["endTime"] = tmpTime
                    if point == indexLength{
                        hitStartTime := hitsSources[len(hitsSources)-1]["startTime"].(string)
                        realStartTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitStartTime, loc)
                        stayTime = math.Abs(mTime.Sub(realStartTime).Seconds())
                        hitsSources[len(hitsSources)-1]["stayTime"] = stayTime
                    }
                    continue
                } else {
                    hitStartTime := hitsSources[len(hitsSources)-1]["startTime"].(string)
                    hitEndTime := hitsSources[len(hitsSources)-1]["endTime"].(string)
                    realStartTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitStartTime, loc)
                    realEndTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitEndTime, loc)
                    stayTime = math.Abs(realEndTime.Sub(realStartTime).Seconds())
                    if sinTime.Sub(mTime).Seconds() == 0 {
                        sinTime.Add(time.Second * 1)
                        sinTime.Format("2006-01-02 15:04:05")
                        hitsSources[len(hitsSources)-1]["endTime"] = tmpTime
                        stayTime =  1
                    }
                    hitsSources[len(hitsSources)-1]["stayTime"] = stayTime
                    startTime = ""
                    continue
                    //fmt.Println(hitsSources[len(hitsSources)-1])
                }
            }
            //fmt.Println("========================================================")
            startTime = tmpTime
            tmpHitSource["personId"] = baseInfo["targetId"].(string)
            tmpHitSource["startTime"] = sTime
            tmpHitSource["startFacePicUrl"] = targetInfo["picSmUrl"].(string)
            tmpHitSource["endTime"] = eTime
            tmpHitSource["stayTime"] = stayTime
            hitsSources = append(hitsSources, tmpHitSource)
        }
        allSource = append(allSource, hitsSources...)
    }
    count := len(allSource)
    //fmt.Println(count)
    s["count"] = count
    s["allSource"] = allSource
    s["queryUseTime"] = queryUseTime
    return s, nil
}
func SourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) {
    s := make(map[string]interface{})
    loc, err := time.LoadLocation("Asia/Shanghai")
@@ -592,10 +698,6 @@
            hitsSources = append(hitsSources, tmpHitSource)
        }
        allSource = append(allSource, hitsSources...)
        //    tmpSources["groupKey"] = groupKey
        //    tmpSources["doc_count"] = docCount
        //    tmpSources["hits_sources"] = hitsSources
        //    sources = append(sources, tmpSources)
    }
    count := len(allSource)
    //fmt.Println(count)