sunty
2020-08-28 353829e0039ebebdf3502cd198248edf7e94d8d4
add ids faceinfo
2个文件已修改
100 ■■■■ 已修改文件
EsApi.go 54 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
EsClient.go 46 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
EsApi.go
@@ -5,6 +5,7 @@
    "encoding/json"
    "errors"
    "fmt"
    "sort"
    "strconv"
    "strings"
    "sync"
@@ -159,9 +160,27 @@
}
/**************************************customer analysis util start**************************************/
/*******************sort []map util*******************/
type MapsSort struct {
    Key     string
    MapList []map[string]interface{}
}
func (m *MapsSort) Len() int {
    return len(m.MapList)
}
func (m *MapsSort) Less(i, j int) bool {
    return m.MapList[i][m.Key].(string) > m.MapList[j][m.Key].(string)
}
func (m *MapsSort) Swap(i, j int) {
    m.MapList[i], m.MapList[j] = m.MapList[j], m.MapList[i]
}
/*******************sort []map util*******************/
//根据时间范围聚合所有区域人信息,返回固定条数
func GetFaceDataByTimeAnd(startTime string, total int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) {
func GetFaceDataByTimeAnd(startTime string, total int, thresholdTime int, thresholdStayTime int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) {
    var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
    var requestBody = `{
    "query": {
@@ -219,6 +238,7 @@
                            "includes": [
                                "baseInfo.targetId",
                                "targetInfo.picSmUrl",
                                "targetInfo.areaId",
                                "picDate"
                            ]
                        }
@@ -232,12 +252,26 @@
    if err != nil {
        return nil, err
    }
    source, err := Sourcelist(buf)
    source, err := FaceSourceAggregations(buf, thresholdTime, thresholdStayTime)
    if err != nil {
        return nil, err
    }
    fmt.Println(source)
    return resData, nil
    fmt.Println(len(source))
    faceSource := make([]map[string]interface{}, 0)
    for index, info := range source {
        if int(info["stayTime"].(float64)) > thresholdStayTime {
            faceSource = append(faceSource, source[index])
        }
    }
    //fmt.Println(len(source))
    if len(faceSource) > total {
        mapsSort := MapsSort{}
        mapsSort.Key = "endTime"
        mapsSort.MapList = faceSource
        sort.Sort(&mapsSort)
        return mapsSort.MapList[:total], nil
    }
    return faceSource, nil
}
func GetFaceIdDeduplication(startTime string, endTime string, serverIp string, serverPort string, indexName string) (ids []string, err error) {
@@ -280,13 +314,16 @@
        }
    }
}`
    //fmt.Println(requestUrl)
    //fmt.Println(requestBody)
    buf, err := EsReq("POST", requestUrl, []byte(requestBody))
    if err != nil {
        return nil, err
    }
    fmt.Println(buf)
    //ids, err := SourceDeduplication(buf)
    ids, err1 := SourceDeduplication(buf)
    if err1 != nil {
        return nil, err1
    }
    return ids,nil
}
@@ -581,8 +618,7 @@
    }
    picMaxUrls := tRes[0].PicMaxUrl
    sourceStr := `        
        "lang":"painless",
        "inline": "ctx._source.picMaxUrl.add('` + picUrl + `');ctx._source.updateTime='` + updateTime + `'"
        "source": "ctx._source.picMaxUrl.add('` + picUrl + `');ctx._source.updateTime='` + updateTime + `'"
`
    if len(picMaxUrls) >= 2 {
        sourceStr = `"source": "ctx._source.picMaxUrl[1]='` + picUrl + `';ctx._source.updateTime='` + updateTime + `'"`
EsClient.go
@@ -476,8 +476,7 @@
    return tmpinfos
}
func FaceSourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) {
    s := make(map[string]interface{})
func FaceSourceAggregations(buf []byte, thresholdTime int,thresholdStayTime int) (sources []map[string]interface{}, err error) {
    loc, err := time.LoadLocation("Asia/Shanghai")
    if err != nil {
        return nil, errors.New("时区设置错误")
@@ -509,12 +508,12 @@
        for _, in := range finalHits {
            point = point+1
            tmpHitSource := make(map[string]interface{})
            tmpbuf, ok := in.(map[string]interface{})
            tmpBuf, ok := in.(map[string]interface{})
            if !ok {
                fmt.Println("change to source error!")
                continue
            }
            source, ok := tmpbuf["_source"].(map[string]interface{})
            source, ok := tmpBuf["_source"].(map[string]interface{})
            if !ok {
                fmt.Println("change _source error!")
                continue
@@ -535,7 +534,7 @@
                passTime := math.Abs(mTime.Sub(sinTime).Seconds())
                hitsSources[len(hitsSources)-1]["stayTime"] = stayTime
                //fmt.Println("passTime:   ", passTime)
                if passTime <= thresholdTime || point == indexLength{
                if int(passTime) <= thresholdTime || point == indexLength{
                    startTime = tmpTime
                    hitsSources[len(hitsSources)-1]["endTime"] = tmpTime
                    if point == indexLength{
@@ -543,6 +542,7 @@
                        realStartTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitStartTime, loc)
                        stayTime = math.Abs(mTime.Sub(realStartTime).Seconds())
                        hitsSources[len(hitsSources)-1]["stayTime"] = stayTime
                        startTime = ""
                    }
                    continue
                } else {
@@ -565,7 +565,11 @@
            }
            //fmt.Println("========================================================")
            startTime = tmpTime
            tmpHitSource["personId"] = baseInfo["targetId"].(string)
            tmpHitSource["faceId"] = baseInfo["targetId"].(string)
            if targetInfo["areaId"] == nil {
                continue
            }
            tmpHitSource["areaId"] = targetInfo["areaId"].(string)
            tmpHitSource["startTime"] = sTime
            tmpHitSource["startFacePicUrl"] = targetInfo["picSmUrl"].(string)
            tmpHitSource["endTime"] = eTime
@@ -574,14 +578,30 @@
        }
        allSource = append(allSource, hitsSources...)
    }
    count := len(allSource)
    //fmt.Println(count)
    s["count"] = count
    s["allSource"] = allSource
    s["queryUseTime"] = queryUseTime
    return s, nil
    return allSource, nil
}
func SourceDeduplication(buf [] byte)  ([]string,error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return nil, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["aggregations"].(map[string]interface{})
    if !ok {
        return nil, errors.New("first hits change error!")
    }
    bucketsAggs := middle["buckets_aggs"].(map[string]interface{})
    buckets := bucketsAggs["buckets"].([]interface{})
    if len(buckets) == 0 {
        return nil, nil
    }
    faceId := make([]string,0)
    for _, in := range buckets {
        faceId = append(faceId, in.(map[string]interface{})["key"].(map[string]interface{})["faceId"].(string))
    }
    return faceId,nil
}
func SourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) {
    s := make(map[string]interface{})
    loc, err := time.LoadLocation("Asia/Shanghai")