| | |
| | | "encoding/json" |
| | | "errors" |
| | | "fmt" |
| | | "sort" |
| | | "strconv" |
| | | "strings" |
| | | "sync" |
| | |
| | | } |
| | | |
| | | /**************************************customer analysis util start**************************************/ |
| | | /*******************sort []map util*******************/ |
| | | type MapsSort struct { |
| | | Key string |
| | | MapList []map[string]interface{} |
| | | } |
| | | |
| | | func (m *MapsSort) Len() int { |
| | | return len(m.MapList) |
| | | } |
| | | |
| | | func (m *MapsSort) Less(i, j int) bool { |
| | | return m.MapList[i][m.Key].(string) > m.MapList[j][m.Key].(string) |
| | | } |
| | | |
| | | func (m *MapsSort) Swap(i, j int) { |
| | | m.MapList[i], m.MapList[j] = m.MapList[j], m.MapList[i] |
| | | } |
| | | |
| | | /*******************sort []map util*******************/ |
| | | //根据时间范围聚合所有区域人信息,返回固定条数 |
| | | func GetFaceDataByTimeAnd(startTime string, total int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) { |
| | | func GetFaceDataByTimeAnd(startTime string, total int, thresholdTime int, thresholdStayTime int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) { |
| | | var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | var requestBody = `{ |
| | | "query": { |
| | |
| | | "includes": [ |
| | | "baseInfo.targetId", |
| | | "targetInfo.picSmUrl", |
| | | "targetInfo.areaId", |
| | | "picDate" |
| | | ] |
| | | } |
| | |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | source, err := Sourcelist(buf) |
| | | source, err := FaceSourceAggregations(buf, thresholdTime, thresholdStayTime) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | fmt.Println(source) |
| | | return resData, nil |
| | | fmt.Println(len(source)) |
| | | faceSource := make([]map[string]interface{}, 0) |
| | | for index, info := range source { |
| | | if int(info["stayTime"].(float64)) > thresholdStayTime { |
| | | faceSource = append(faceSource, source[index]) |
| | | } |
| | | } |
| | | //fmt.Println(len(source)) |
| | | if len(faceSource) > total { |
| | | mapsSort := MapsSort{} |
| | | mapsSort.Key = "endTime" |
| | | mapsSort.MapList = faceSource |
| | | sort.Sort(&mapsSort) |
| | | return mapsSort.MapList[:total], nil |
| | | } |
| | | return faceSource, nil |
| | | } |
| | | |
| | | func GetFaceIdDeduplication(startTime string, endTime string, serverIp string, serverPort string, indexName string) (ids []string, err error) { |
| | |
| | | } |
| | | } |
| | | }` |
| | | //fmt.Println(requestUrl) |
| | | //fmt.Println(requestBody) |
| | | buf, err := EsReq("POST", requestUrl, []byte(requestBody)) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | fmt.Println(buf) |
| | | //ids, err := SourceDeduplication(buf) |
| | | |
| | | return ids,nil |
| | | ids, err1 := SourceDeduplication(buf) |
| | | if err1 != nil { |
| | | return nil, err1 |
| | | } |
| | | return ids, nil |
| | | } |
| | | |
| | | /**************************************customer analysis util end**************************************/ |
| | |
| | | return err |
| | | } |
| | | picMaxUrls := tRes[0].PicMaxUrl |
| | | sourceStr := ` |
| | | "lang":"painless", |
| | | "inline": "ctx._source.picMaxUrl.add('` + picUrl + `');ctx._source.updateTime='` + updateTime + `'" |
| | | sourceStr := ` |
| | | "source": "ctx._source.picMaxUrl.add('` + picUrl + `');ctx._source.updateTime='` + updateTime + `'" |
| | | ` |
| | | if len(picMaxUrls) >= 2 { |
| | | sourceStr = `"source": "ctx._source.picMaxUrl[1]='` + picUrl + `';ctx._source.updateTime='` + updateTime + `'"` |
| | |
| | | return tmpinfos |
| | | } |
| | | |
| | | func FaceSourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) { |
| | | s := make(map[string]interface{}) |
| | | func FaceSourceAggregations(buf []byte, thresholdTime int,thresholdStayTime int) (sources []map[string]interface{}, err error) { |
| | | loc, err := time.LoadLocation("Asia/Shanghai") |
| | | if err != nil { |
| | | return nil, errors.New("时区设置错误") |
| | |
| | | for _, in := range finalHits { |
| | | point = point+1 |
| | | tmpHitSource := make(map[string]interface{}) |
| | | tmpbuf, ok := in.(map[string]interface{}) |
| | | tmpBuf, ok := in.(map[string]interface{}) |
| | | if !ok { |
| | | fmt.Println("change to source error!") |
| | | continue |
| | | } |
| | | source, ok := tmpbuf["_source"].(map[string]interface{}) |
| | | source, ok := tmpBuf["_source"].(map[string]interface{}) |
| | | if !ok { |
| | | fmt.Println("change _source error!") |
| | | continue |
| | |
| | | passTime := math.Abs(mTime.Sub(sinTime).Seconds()) |
| | | hitsSources[len(hitsSources)-1]["stayTime"] = stayTime |
| | | //fmt.Println("passTime: ", passTime) |
| | | if passTime <= thresholdTime || point == indexLength{ |
| | | if int(passTime) <= thresholdTime || point == indexLength{ |
| | | startTime = tmpTime |
| | | hitsSources[len(hitsSources)-1]["endTime"] = tmpTime |
| | | if point == indexLength{ |
| | |
| | | realStartTime, _ := time.ParseInLocation("2006-01-02 15:04:05", hitStartTime, loc) |
| | | stayTime = math.Abs(mTime.Sub(realStartTime).Seconds()) |
| | | hitsSources[len(hitsSources)-1]["stayTime"] = stayTime |
| | | startTime = "" |
| | | } |
| | | continue |
| | | } else { |
| | |
| | | } |
| | | //fmt.Println("========================================================") |
| | | startTime = tmpTime |
| | | tmpHitSource["personId"] = baseInfo["targetId"].(string) |
| | | tmpHitSource["faceId"] = baseInfo["targetId"].(string) |
| | | if targetInfo["areaId"] == nil { |
| | | continue |
| | | } |
| | | tmpHitSource["areaId"] = targetInfo["areaId"].(string) |
| | | tmpHitSource["startTime"] = sTime |
| | | tmpHitSource["startFacePicUrl"] = targetInfo["picSmUrl"].(string) |
| | | tmpHitSource["endTime"] = eTime |
| | |
| | | } |
| | | allSource = append(allSource, hitsSources...) |
| | | } |
| | | count := len(allSource) |
| | | //fmt.Println(count) |
| | | s["count"] = count |
| | | s["allSource"] = allSource |
| | | s["queryUseTime"] = queryUseTime |
| | | return s, nil |
| | | return allSource, nil |
| | | } |
| | | |
| | | func SourceDeduplication(buf [] byte) ([]string,error) { |
| | | var info interface{} |
| | | json.Unmarshal(buf, &info) |
| | | out, ok := info.(map[string]interface{}) |
| | | if !ok { |
| | | return nil, errors.New("http response interface can not change map[string]interface{}") |
| | | } |
| | | middle, ok := out["aggregations"].(map[string]interface{}) |
| | | if !ok { |
| | | return nil, errors.New("first hits change error!") |
| | | } |
| | | bucketsAggs := middle["buckets_aggs"].(map[string]interface{}) |
| | | buckets := bucketsAggs["buckets"].([]interface{}) |
| | | if len(buckets) == 0 { |
| | | return nil, nil |
| | | } |
| | | faceId := make([]string,0) |
| | | for _, in := range buckets { |
| | | faceId = append(faceId, in.(map[string]interface{})["key"].(map[string]interface{})["faceId"].(string)) |
| | | } |
| | | return faceId,nil |
| | | } |
| | | func SourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) { |
| | | s := make(map[string]interface{}) |
| | | loc, err := time.LoadLocation("Asia/Shanghai") |