sunty
2024-03-25 ebfa610f8c66fd2827a2eec619bfb3e0e22c332f
EsApi.go
@@ -22,6 +22,141 @@
   }
}
//***********************重庆Start**********************************//
type activeHourFormat struct {
   startTime string
   endTime   string
   startHour int
   endHour   int
}
func formatActiveHour(activeHour string) (activeHourFormat, error) {
   hours := strings.Split(activeHour, "-")
   if len(hours) == 2 {
      startHour := hours[0]
      endHour := hours[1]
      // 解析开始时间的小时和分钟
      startParts := strings.Split(startHour, ":")
      startHourInt, _ := strconv.Atoi(startParts[0])
      // 解析结束时间的小时和分钟
      endParts := strings.Split(endHour, ":")
      endHourInt, _ := strconv.Atoi(endParts[0])
      // 输出开始时间的小时
      fmt.Println("开始时间的小时:", startHourInt)
      // 输出结束时间的小时 + 1
      endHourPlusOne := (endHourInt + 1) % 24 // 取余确保不超过24小时
      fmt.Println("结束时间的小时 + 1:", endHourPlusOne)
      activeHourFormat := activeHourFormat{startTime: startHour, endTime: endHour, startHour: startHourInt, endHour: endHourPlusOne}
      return activeHourFormat, nil
   }
   return activeHourFormat{}, errors.New("错误:无法解析开始时间和结束时间")
}
func DayNightActivityQuery(communityId string, startTime string, endTime string, activeHour string, indexName string, serverIp string, serverPort string) ([]string, error) {
   activityId := make([]string, 0)
   esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
   activeHourFormat, err := formatActiveHour(activeHour)
   if err != nil {
      return nil, err
   }
   queryDSL := `
   {
       "size": 0,
       "query": {
           "bool": {
               "filter": [
                   {
                       "range": {
                           "picDate": {
                               "gte": "` + startTime + `",
                               "lt": "` + endTime + `"
                           }
                       }
                   },
                   {
                       "term": {
                           "communityId": "` + communityId + `"
                       }
                   },
                   {
                       "script": {
                           "script": {
                               "source": "doc['picDate'].value.hourOfDay >= ` + strconv.Itoa(activeHourFormat.startHour) + ` || doc['picDate'].value.hourOfDay < ` + strconv.Itoa(activeHourFormat.endHour) + `",
                               "lang": "painless"
                           }
                       }
                   }
               ],
               "must_not": [
                   {
                       "term": {
                           "documentNumber": ""
                       }
                   }
               ]
           }
       },
       "aggs": {
           "group_by_documentnumber": {
               "terms": {
                   "field": "documentNumber",
                   "size": 100000
               },
               "aggs": {
                   "group_by_date": {
                       "date_histogram": {
                           "field": "picDate",
                           "interval": "1d", // 按天分桶
                           "format": "yyyy-MM-dd"
                       },
                       "aggs": {
                           "top_hits": {
                               "top_hits": {
                                   "_source": [
                                       "picDate"
                                   ],
                                   "size": 100000,
                                   "sort": [
                                       {
                                           "picDate": {
                                               "order": "desc"
                                           }
                                       }
                                   ]
                               }
                           }
                       }
                   }
               }
           }
       }
   }`
   //fmt.Println(esURL)
   //fmt.Println(queryDSL)
   buf, err := EsReq("POST", esURL, []byte(queryDSL))
   if err != nil {
      return nil, err
   }
   source, err := SourceAggregationList(buf)
   if err != nil {
      return nil, err
   }
   result, _ := decodeDocumentInfos(source)
   return result, nil
   return activityId, nil
}
// ***********************重庆End************************************//
// 根据抓拍人员id查询抓拍人员信息
func AIOceaninfosbyid(id []string, indexName string, serverIp string, serverPort string) ([]protomsg.AIOcean, error) {
   var aIOceanInfo []protomsg.AIOcean
@@ -56,7 +191,43 @@
   return aIOcean, nil
}
//根据抓拍库人员id查询特征值
// 根据抓拍人员id查询视频地址
func AIOceanVideoUrlbyid(id string, indexName string, serverIp string, serverPort string) (string, error) {
   //var aIOceanInfo []protomsg.AIOcean
   //videopersonsPersonId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1)
   var dbinfoRequest = `
      {
          "query": {
              "bool": {
                  "filter": [
                      {
                          "term": {
                                "id": "` + id + `"
                          }
                      }
                  ]
              }
          },
          "_source": [
              "videoUrl"
          ]
      }
        `
   buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search", []byte(dbinfoRequest))
   if err != nil {
      return "", err
   }
   sources, err := Sourcelist(buf)
   if err != nil {
      return "", err
   }
   videoUrl := sources[0]["videoUrl"].(string)
   //aIOcean := AIOceanAnalysis(sources)
   return videoUrl, nil
}
// 根据抓拍库人员id查询特征值
func GetVideoPersonFaceFeatureById(id string, indexName string, serverIp string, serverPort string) (string, error) {
   var jsonDSL = `
            {
@@ -86,7 +257,7 @@
   return feature, nil
}
//根据目标id查询已追加条数
// 根据目标id查询已追加条数
func GetLinkTagInfoSize(id string, indexName string, serverIp string, serverPort string) (size int, err error) {
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
   queryDSL := `{
@@ -112,7 +283,7 @@
   return size, nil
}
//根据目标id追加跟踪信息
// 根据目标id追加跟踪信息
func AppendTargetInfo(id string, targetInfo string, indexName string, serverIp string, serverPort string, updateTime string) (string, error) {
   if targetInfo == "" {
      return "", errors.New("append data is nil")
@@ -454,7 +625,7 @@
   return ids, nil
}
//统计各个区域人数
// 统计各个区域人数
func StatisticsEveryAreaPersonsNumber(startTime string, endTime string, serverIp string, serverPort string, indexName string) ([]map[string]interface{}, error) {
   var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
   var requestBody = `{
@@ -581,7 +752,7 @@
}
//根据时间范围,摄像机列表,分组聚合人脸列表,返回分组数据
// 根据时间范围,摄像机列表,分组聚合人脸列表,返回分组数据
func GetFaceDataBucketsByCameraIdAndTimeReturnByGrouped(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) {
   var filterArr []string
   if cameraId != nil && len(cameraId) > 0 {
@@ -687,7 +858,7 @@
   return sources, nil
}
//根据时间范围,摄像机列表,分组聚合人脸列表
// 根据时间范围,摄像机列表,分组聚合人脸列表
func GetFaceDataBucketsByCameraIdAndTime(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) {
   var filterArr []string
   if cameraId != nil && len(cameraId) > 0 {
@@ -788,7 +959,7 @@
   return sources, nil
}
//根据抓拍人员id更新(picurl)图片地址
// 根据抓拍人员id更新(picurl)图片地址
func UpdatePicUrlById(id string, picUrl string, indexName string, serverIp string, serverPort string) (err error) {
   updateTime := time.Now().Format("2006-01-02 15:04:05")
   tRes, err := AIOceaninfosbyid([]string{id}, indexName, serverIp, serverPort)
@@ -851,7 +1022,7 @@
   return nil
}
//根据抓拍人员id更新(videourl)摄像机地址
// 根据抓拍人员id更新(videourl)摄像机地址
func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int, err error) {
   var info interface{}
@@ -888,8 +1059,8 @@
      return statu, errors.New("http response interface can not change map[string]interface{}")
   }
   middle, ok := out["updated"].(float64)
   batches,ok1 := out["batches"].(float64)
   if !ok || !ok1{
   batches, ok1 := out["batches"].(float64)
   if !ok || !ok1 {
      logPrint("first updated change error!")
      statu = 500
      return statu, errors.New("first updated change error!")
@@ -897,21 +1068,21 @@
   if batches == 0 {
      logPrint("no such doc in database")
      statu = 400
      return statu,errors.New("目标数据不存在")
      return statu, errors.New("目标数据不存在")
   } else {
      if middle == 1 {
          statu = 200
          return statu, nil
       }
       if middle == 0 {
          statu = 201
          return statu, errors.New("已经修改")
       }
         statu = 200
         return statu, nil
      }
      if middle == 0 {
         statu = 201
         return statu, errors.New("已经修改")
      }
   }
   return statu, nil
}
//获取当前节点抓拍库所有人员ID*缓存*
// 获取当前节点抓拍库所有人员ID*缓存*
func GetAllLocalVideopersonsId(compareArgs protomsg.CompareArgs, indexName string, serverIp string, serverPort string, alarmLevelTypes string) (capturetable []string) {
   queryStr := ""
   queryBody := compareArgs.InputValue
@@ -951,7 +1122,12 @@
   isCollectStr := ""
   isCollect := compareArgs.Collection
   if isCollect != "" {
      isCollectStr = "{\"term\":{\"isCollect\":\"" + isCollect + "\"}},"
      //isCollectStr = "{\"term\":{\"isCollect\":\"" + isCollect + "\"}},"
      if isCollect == "1" {
         isCollectStr = "{\"term\":{\"isCollect\":true}},"
      } else if isCollect == "0" {
         isCollectStr = "{\"term\":{\"isCollect\":false}},"
      }
   }
   //判断布防等级
@@ -1075,7 +1251,7 @@
   return capturetable
}
//初始化实时抓拍
// 初始化实时抓拍
func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm string, category string, quantity int) ([]protomsg.AIOcean, error) {
   var aIOceanInfo []protomsg.AIOcean
   url := "http://" + serverIp + ":" + serverPort +
@@ -1128,7 +1304,7 @@
   return aIOcean, nil
}
//实时抓拍
// 实时抓拍
func RealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool) ([]protomsg.AIOcean, error) {
   var aIOceanInfo []protomsg.AIOcean
   url := "http://" + serverIp + ":" + serverPort +
@@ -1172,7 +1348,7 @@
   return aIOcean, nil
}
//综合统计
// 综合统计
func StatisticsComprehensive(serverIp string, serverPort string, indexName string, isAlarm string) (total int, err error) {
   url := "http://" + serverIp + ":" + serverPort +
      "/" + indexName + "/_search"
@@ -1221,7 +1397,7 @@
   return total, nil
}
//实时报警任务比率
// 实时报警任务比率
func RealTimeAlarmTaskRate(serverIp string, serverPort string, indexName string) (sources []map[string]interface{}, err error) {
   url := "http://" + serverIp + ":" + serverPort +
      "/" + indexName + "/_search"
@@ -1282,7 +1458,7 @@
   return sources, nil
}
//聚合任务列表,taskId+taskName
// 聚合任务列表,taskId+taskName
func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string, cameraIds []string) (sources []map[string]interface{}, err error) {
   url := "http://" + serverIp + ":" + serverPort +
      "/" + indexName + "/_search"
@@ -1378,7 +1554,7 @@
}
//添加即将删除信号
// 添加即将删除信号
func AddDeleteSignal() {
}
@@ -1419,7 +1595,7 @@
}
//查询时间段数据 *缓存*
// 查询时间段数据 *缓存*
func GetPeriodInfos(serverIp string, serverPort string, startTime string, endTime string, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) {
   var capdbinfo []*protomsg.MultiFeaCache
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
@@ -1597,8 +1773,8 @@
   return dbinfos, nil
}
//************************CORN TASK*******************************
//查询日期范围内是否还存在数据
// ************************CORN TASK*******************************
// 查询日期范围内是否还存在数据
func QueryAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
   deleteJson := `{
@@ -1637,8 +1813,8 @@
   return result, nil
}
//按日期范围,服务器Id删除数据
func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (total int, err error,) {
// 按日期范围,服务器Id删除数据
func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (total int, err error) {
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_delete_by_query"
   deleteJson := `{
   "query":{
@@ -1670,10 +1846,10 @@
   if err != nil {
      return -1, errors.New("解码失败")
   }
   return deleteRes,nil
   return deleteRes, nil
}
//给所有节点追加删除任务信息
// 给所有节点追加删除任务信息
func AddDelTask(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
   addJson := `{
@@ -1716,7 +1892,7 @@
   return result, nil
}
//移除已执行完的删除任务
// 移除已执行完的删除任务
func DeleteDelTask(serverIp string, serverPort string, indexName string, analyServerId string) (result bool, err error) {
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
   deleteJson := `{
@@ -1761,7 +1937,7 @@
   ShardNode  string `json:"shardNode"`  //分片所在节点名称
}
//获取索引分片信息
// 获取索引分片信息
func GetShardsByIndex(serverIp string, serverPort string, indexName string) ([]ShardInfo, error) {
   url := "http://" + serverIp + ":" + serverPort + "/_cat/shards?v"
   buf, err := EsReq("GET", url, []byte(""))