sunty
2020-09-08 171d94e0f254b485ed5d09cef9a208b0f5672048
EsApi.go
@@ -1,15 +1,15 @@
package esutil
import (
   "basic.com/pubsub/protomsg.git"
   "encoding/json"
   "errors"
   "fmt"
   "sort"
   "strconv"
   "strings"
   "sync"
   "time"
   "basic.com/pubsub/protomsg.git"
)
var logPrint = func(i ...interface{}) {
@@ -159,11 +159,359 @@
}
/**************************************customer analysis util start**************************************/
/*******************sort []map util*******************/
type MapsSort struct {
   Key     string
   MapList []map[string]interface{}
}
func (m *MapsSort) Len() int {
   return len(m.MapList)
}
func (m *MapsSort) Less(i, j int) bool {
   return m.MapList[i][m.Key].(string) > m.MapList[j][m.Key].(string)
}
func (m *MapsSort) Swap(i, j int) {
   m.MapList[i], m.MapList[j] = m.MapList[j], m.MapList[i]
}
/*******************sort []map util*******************/
//根据时间范围聚合所有区域人信息,返回固定条数
func GetFaceDataByTimeAndTotal(startTime string, endTime string, total int, thresholdTime int, thresholdStayTime int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) {
   var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
   var requestBody = `{
    "query": {
        "bool": {
            "filter": [
                {
                    "range": {
                        "picDate": {
                            "gte": "` + startTime + `",
                     "lte": "` + endTime + `"
                        }
                    }
                },
                {
                    "term":{
                        "targetInfo.targetType.raw": "FaceDetect"
                    }
                }
            ]
        }
    },
    "size": 0,
    "aggs": {
        "buckets_aggs": {
            "composite": {
                "sources": [
                    {
                        "faceId": {
                            "terms": {
                                "field": "baseInfo.targetId"
                            }
                        }
                    },
                    {
                        "areaId": {
                            "terms": {
                                "field": "targetInfo.areaId"
                            }
                        }
                    }
                ],
                "size": 10000000
            },
            "aggs": {
                "top_attention_hits": {
                    "top_hits": {
                        "size": 1000000,
                        "sort": [
                            {
                                "picDate": {
                                    "order": "asc"
                                }
                            }
                        ],
                        "_source": {
                            "includes": [
                                "baseInfo.targetId",
                                "targetInfo.picSmUrl",
                        "targetInfo.areaId",
                                "picDate"
                            ]
                        }
                    }
                }
            }
        }
    }
}`
   buf, err := EsReq("POST", requestUrl, []byte(requestBody))
   if err != nil {
      return nil, err
   }
   source, err := FaceSourceAggregations(buf, thresholdTime, thresholdStayTime)
   if err != nil {
      return nil, err
   }
   if len(source) == 0{
      return source,nil
   }
   faceSource := make([]map[string]interface{}, 0)
   for index, info := range source {
      if int(info["stayTime"].(float64)) > thresholdStayTime {
         faceSource = append(faceSource, source[index])
      }
   }
   mapsSort := MapsSort{}
   mapsSort.Key = "endTime"
   mapsSort.MapList = faceSource
   sort.Sort(&mapsSort)
   if len(faceSource) > total {
      return mapsSort.MapList[:total], nil
   }
   return mapsSort.MapList, nil
}
func GetFaceDataByTimeAndId(startTime string, endTime string, id string, thresholdTime int, thresholdStayTime int, serverIp string, serverPort string, indexName string) (resData []map[string]interface{}, err error) {
   var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
   var requestBody = `{
    "query": {
        "bool": {
            "filter": [
                {
                    "range": {
                        "picDate": {
                            "gte": "` + startTime + `",
                      "lte": "` + endTime + `"
                        }
                    }
                },
                {
                    "term":{
                        "targetInfo.targetType.raw": "FaceDetect"
                    }
                },
            {
                    "term":{
                        "baseInfo.targetId": "` + id + `"
                    }
                }
            ]
        }
    },
    "size": 0,
    "aggs": {
        "buckets_aggs": {
            "composite": {
                "sources": [
                    {
                        "faceId": {
                            "terms": {
                                "field": "baseInfo.targetId"
                            }
                        }
                    },
                    {
                        "areaId": {
                            "terms": {
                                "field": "targetInfo.areaId"
                            }
                        }
                    }
                ],
                "size": 10000000
            },
            "aggs": {
                "top_attention_hits": {
                    "top_hits": {
                        "size": 1000000,
                        "sort": [
                            {
                                "picDate": {
                                    "order": "asc"
                                }
                            }
                        ],
                        "_source": {
                            "includes": [
                                "baseInfo.targetId",
                                "targetInfo.picSmUrl",
                        "targetInfo.areaId",
                                "picDate"
                            ]
                        }
                    }
                }
            }
        }
    }
}`
   buf, err := EsReq("POST", requestUrl, []byte(requestBody))
   if err != nil {
      return nil, err
   }
   source, err := FaceSourceAggregations(buf, thresholdTime, thresholdStayTime)
   if err != nil {
      return nil, err
   }
   if len(source) == 0{
      return source,nil
   }
   faceSource := make([]map[string]interface{}, 0)
   for index, info := range source {
      if int(info["stayTime"].(float64)) > thresholdStayTime {
         faceSource = append(faceSource, source[index])
      }
   }
   mapsSort := MapsSort{}
   mapsSort.Key = "startTime"
   mapsSort.MapList = faceSource
   sort.Sort(&mapsSort)
   return mapsSort.MapList, nil
}
func GetFaceIdDeduplication(startTime string, endTime string, serverIp string, serverPort string, indexName string) (ids []map[string]interface{}, err error) {
   var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
   var requestBody = `{
    "query": {
        "bool": {
            "filter": [
                {
                    "range": {
                        "picDate": {
                            "gte": "` + startTime + `",
                            "lte": "` + endTime + `"
                        }
                    }
                },
                {
                    "term": {
                        "targetInfo.targetType.raw": "FaceDetect"
                    }
                }
            ]
        }
    },
    "size": 0,
    "aggs": {
        "buckets_aggs": {
            "composite": {
                "sources": [
                    {
                        "faceId": {
                            "terms": {
                                "field": "baseInfo.targetId"
                            }
                        }
                    }
                ],
                "size": 10000000
            },
               "aggs": {
                   "top_attention_hits": {
                       "top_hits": {
                           "size": 1,
                           "sort": [
                               {
                                   "picDate": {
                                       "order": "desc"
                                   }
                               }
                           ],
                           "_source": {
                               "includes": [
                                   "picDate"
                               ]
                           }
                       }
                      }
                  }
           }
        }
    }
}`
   //fmt.Println(requestUrl)
   //fmt.Println(requestBody)
   buf, err := EsReq("POST", requestUrl, []byte(requestBody))
   if err != nil {
      return nil, err
   }
   ids, err1 := SourceDeduplication(buf)
   if err1 != nil {
      return nil, err1
   }
   if len(ids) > 1 {
      mapsSort := MapsSort{}
      mapsSort.Key = "lastTime"
      mapsSort.MapList = ids
      sort.Sort(&mapsSort)
      return mapsSort.MapList, nil
   }
   return ids, nil
}
//统计各个区域人数
func StatisticsEveryAreaPersonsNumber(startTime string, endTime string, serverIp string, serverPort string, indexName string) ([]map[string]interface{}, error) {
   var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
   var requestBody = `{
    "query": {
        "bool": {
            "filter": [
                {
                    "range": {
                        "picDate": {
                            "gte": "` + startTime + `",
                            "lte": "` + endTime + `"
                        }
                    }
                },
                {
                    "term": {
                        "targetInfo.targetType.raw": "Yolo"
                    }
                }
            ]
        }
    },
    "size": 0,
    "aggs": {
        "buckets_aggs": {
            "composite": {
                "sources": [
                    {
                        "areaId": {
                            "terms": {
                                "field": "targetInfo.areaId"
                            }
                        }
                    }
                ],
                "size": 10000000
            }
        }
    }
}`
   buf, err := EsReq("POST", requestUrl, []byte(requestBody))
   if err != nil {
      return nil, err
   }
   result, err := SourceStatistics(buf)
   if err != nil {
      return nil, err
   }
   return result, nil
}
/**************************************customer analysis util end**************************************/
//根据摄像机列表和时间查询人员浏览轨迹
func GetPersonDataByCameraIdAndTime(cameraId []string, startTime string, endTime string, serverIp string, ServerPort string, indexName string) (map[string]interface{}, error) {
func GetPersonDataByCameraIdAndTime(cameraId []string, startTime string, endTime string, serverIp string, serverPort string, indexName string) (map[string]interface{}, error) {
   var filterArr []string
   if cameraId != nil && len(cameraId) > 0{
   if cameraId != nil && len(cameraId) > 0 {
      esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1)
      filterArr = append(filterArr, `{
                     "terms": {
@@ -186,7 +534,7 @@
                }`)
   queryStr := strings.Join(filterArr, ",")
   personUrl := "http://" + serverIp + ":" + ServerPort + "/" + indexName + "/_search"
   personUrl := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
   personBody := `{
    "query": {
        "bool": {
@@ -236,7 +584,7 @@
//根据时间范围,摄像机列表,分组聚合人脸列表,返回分组数据
func GetFaceDataBucketsByCameraIdAndTimeReturnByGrouped(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) {
   var filterArr []string
   if cameraId != nil && len(cameraId) > 0{
   if cameraId != nil && len(cameraId) > 0 {
      esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1)
      filterArr = append(filterArr, `{
                     "terms": {
@@ -244,7 +592,7 @@
                  }
            }`)
   }
   if personId != nil &&len(personId) > 0{
   if personId != nil && len(personId) > 0 {
      esPersonId := strings.Replace(strings.Trim(fmt.Sprint(personId), "[]"), " ", "\",\"", -1)
      filterArr = append(filterArr, `{
         "terms": {
@@ -296,7 +644,7 @@
                        }
                    }
                ],
                "size": 100000000
                "size": 10000000
            },
            "aggs":{
                "top_attention_hits":{
@@ -342,7 +690,7 @@
//根据时间范围,摄像机列表,分组聚合人脸列表
func GetFaceDataBucketsByCameraIdAndTime(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) {
   var filterArr []string
   if cameraId != nil && len(cameraId) > 0{
   if cameraId != nil && len(cameraId) > 0 {
      esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1)
      filterArr = append(filterArr, `{
                     "terms": {
@@ -350,7 +698,7 @@
                  }
            }`)
   }
   if personId != nil &&len(personId) > 0{
   if personId != nil && len(personId) > 0 {
      esPersonId := strings.Replace(strings.Trim(fmt.Sprint(personId), "[]"), " ", "\",\"", -1)
      filterArr = append(filterArr, `{
         "terms": {
@@ -402,7 +750,7 @@
                        }
                    }
                ],
                "size": 100000000
                "size": 10000000
            },
            "aggs":{
                "top_attention_hits":{
@@ -448,9 +796,8 @@
      return err
   }
   picMaxUrls := tRes[0].PicMaxUrl
   sourceStr := `
        "lang":"painless",
        "inline": "ctx._source.picMaxUrl.add('` + picUrl + `');ctx._source.updateTime='` + updateTime + `'"
   sourceStr := `
        "source": "ctx._source.picMaxUrl.add('` + picUrl + `');ctx._source.updateTime='` + updateTime + `'"
`
   if len(picMaxUrls) >= 2 {
      sourceStr = `"source": "ctx._source.picMaxUrl[1]='` + picUrl + `';ctx._source.updateTime='` + updateTime + `'"`
@@ -492,7 +839,7 @@
   }
   middle, ok := out["updated"].(float64)
   if !ok {
      logPrint("first updated change error!")
      logPrint("first updated change error!", out)
      return errors.New("first updated change error!")
   }
   if middle == 1 {