sunty
2019-11-20 9d4aea534b369527a0470095ddaff6929c2cdf52
EsApi.go
@@ -1,110 +1,19 @@
package esutil
import (
    "encoding/json"
    "errors"
    "fmt"
    "strconv"
    "strings"
   "encoding/json"
   "errors"
   "fmt"
   "strconv"
   "strings"
   "sync"
   "time"
   "basic.com/pubsub/protomsg.git"
        )
// 查询底库人员信息
func Personinfos( queryIndex int, queryNums int, indexName string, serverIp string, serverPort string, analyServerId string) ([]*protomsg.Esinfo, error){
            var dbinfos []*protomsg.Esinfo
            point   := strconv.Itoa(queryIndex)
            number   := strconv.Itoa(queryNums)
            JsonDSL  := ""
            if indexName == "videopersons" {
                 JsonDSL =  `  {
                    "from": ` + point +  `,
                    "query": {
                        "bool": {
                            "filter": [
                                {
                                    "term": {
                                        "analyServerId": "` + analyServerId + `"
                                    }
                                }
                            ]
                        }
                    },
                    "size":`+ number +`,
                    "_source": [
                        "id",
                        "faceFeature"
                    ]
                }`
            }else {
                 JsonDSL =  `  {
                    "from": ` + point +  `,
                    "query": {
                        "match_all": {}
                    },
                     "size":`+ number +`,
                     "_source": [
                       "id",
                       "tableId",
                       "faceFeature"
                       ]
                    }`
            }
            //fmt.Println("url: "+"http://"+serverIp+":"+serverPort+"/"+indexName+"/_search","body: ",JsonDSL)
    buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(JsonDSL))
    if err != nil {
        return dbinfos ,errors.New("http request dbtablename info is err!")
    }
      // 返回 _source 数组
      sources, err := Sourcelist(buf)
      if err != nil {
            return dbinfos,err
      }
      // 返回所有查询的数据
     dbpersoninfos :=  Parsesources(sources)
      return dbpersoninfos, nil
}
// 根据底库id查询底库信息
func Dbtablefosbyid (id []string, indexName string, serverIp string, serverPort string) ([]protomsg.Dbtable, error) {
    var dbinfo []protomsg.Dbtable
    dbtableId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1)
        var dbinfoRequest = `
            {
               "query": {
                  "bool": {
                     "filter": [{
                        "terms": {
                           "id": [
                              "`+ dbtableId +`"
                           ]
                        }
                     }]
                  }
               },
                "size":1000000
            }
        `
    buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfoRequest))
    if err != nil {
        return dbinfo,  err
    }
    sources, err := Sourcelist(buf)
    if err != nil {
        return dbinfo , err
    }
    dbtable := Dbtablebyid(sources)
    return dbtable, nil
}
)
// 根据抓拍人员id查询抓拍人员信息
func AIOceaninfosbyid (id []string, indexName string, serverIp string, serverPort string) ([]protomsg.AIOcean, error) {
func AIOceaninfosbyid(id []string, indexName string, serverIp string, serverPort string) ([]protomsg.AIOcean, error) {
   var aIOceanInfo []protomsg.AIOcean
   videopersonsPersonId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1)
   var dbinfoRequest = `
@@ -114,7 +23,7 @@
                     "filter": [{
                        "terms": {
                           "id": [
                              "`+ videopersonsPersonId +`"
                              "` + videopersonsPersonId + `"
                           ]
                        }
                     }]
@@ -123,64 +32,30 @@
                "size":1000000
            }
        `
   buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfoRequest))
   buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search", []byte(dbinfoRequest))
   if err != nil {
      return aIOceanInfo,  err
      return aIOceanInfo, err
   }
   sources, err := Sourcelist(buf)
   if err != nil {
      return aIOceanInfo , err
      return aIOceanInfo, err
   }
   aIOcean := AIOceanAnalysis(sources)
   println(aIOcean)
   return aIOcean,nil
   return aIOcean, nil
}
// 根据底库人员id查询底库人员信息
func Dbpersoninfosbyid (id []string, indexName string, serverIp string, serverPort string) ([]protomsg.Dbperson, error) {
    var dbinfo []protomsg.Dbperson
    dbtablePersonId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1)
    var dbinfoRequest = `
            {
               "query": {
                  "bool": {
                     "filter": [{
                        "terms": {
                           "id": [
                              "`+ dbtablePersonId +`"
                           ]
                        }
                     }]
                  }
               },
                "size":1000000
            }
        `
    buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfoRequest))
    if err != nil {
        return dbinfo,  err
    }
    sources, err := Sourcelist(buf)
    if err != nil {
        return dbinfo , err
    }
    dbperson := Dbpersonbyid(sources)
    println(dbperson)
    return dbperson,nil
}
//根据抓拍库人员id查询特征值
func GetVideoPersonFaceFeatureById (id string, indexName string, serverIp string, serverPort string) (string, error) {
    var jsonDSL = `
func GetVideoPersonFaceFeatureById(id string, indexName string, serverIp string, serverPort string) (string, error) {
   var jsonDSL = `
            {
               "query": {
                  "bool": {
                     "filter": [{
                        "term": {
                           "id":"`+ id +`"
                           "id":"` + id + `"
                        }
                     }]
                  }
@@ -188,70 +63,32 @@
                "_source":["faceFeature"]
            }
        `
    buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(jsonDSL))
    if err != nil {
        return "",  err
    }
   buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search", []byte(jsonDSL))
   if err != nil {
      return "", err
   }
    sources, err := Sourcelist(buf)
    if err != nil {
        return "" , err
    }
    faceFeature := sources[0]["faceFeature"].(string)
    return faceFeature,nil
   sources, err := Sourcelist(buf)
   if err != nil {
      return "", err
   }
   faceFeature := sources[0]["faceFeature"].(string)
   return faceFeature, nil
}
// 根据tableid 查询tablename
func Dbtablename(tableid string, indexName string, serverIp string, serverPort string) (tablename string, err error) {
    var dbinfotable =` {
        "query": {
            "bool": {
                "filter": [
                {
                    "term": {
                        "id":"`+tableid+`"
                    }
                }
                ]
            }
        },
        "_source": [
            "tableName"
        ],
        "size":1000000
    }
    `
    buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfotable))
    if err != nil {
        return "" ,errors.New("http request dbtablename info is err!")
    }
    sources, err := Sourcelist(buf)
    if err != nil {
          return "",err
    }
    for _, source := range sources {
        if name, ok := source["tableName"].(string); ok {
            tablename = name
            break
        }
    }
    return tablename, nil
}
//根据抓拍人员id更新(videourl)摄像机地址
func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string,command int) (statu int) {
func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int) {
   var info interface{}
   url := "http://"+serverIp+":"+serverPort+"/"+indexName+"/_update_by_query?refresh=true"
   sourceStr :=  "ctx._source.videoUrl='" + videoUrl + "'"
   if command>=0  {
      sourceStr =  "ctx._source.linkTagInfo["+strconv.Itoa(command)+"].videoUrl='" + videoUrl + "'"
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query?refresh=true"
   sourceStr := "ctx._source.videoUrl='" + videoUrl + "'"
   if command >= 0 {
      sourceStr = "ctx._source.linkTagInfo[" + strconv.Itoa(command) + "].videoUrl='" + videoUrl + "'"
   }
   var videoUrlInfo = `
        {
          "script": {
            "source": "`+ sourceStr + `"
            "source": "` + sourceStr + `"
          },
          "query": {
            "term": {
@@ -298,10 +135,10 @@
   queryBody := compareArgs.InputValue
   //检索框
   if queryBody != "" {
      queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"alarmRules.alarmLevel^1.5\",\"ageDescription^1.5\",\"taskName^1.5\",\"baseInfo.tableName^1.5\",\"sex^2.0\",\"race^2.0\",\"content^1.0\",\"baseInfo.idCard^1.8\",\"cameraAddr^1.0\"]," +
      queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"cameraAddr^1.5\",\"taskName^1.5\",\"sdkName^1.5\",\"showLabels^3.0\",\"baseInfo.tableName^1.5\",\"baseInfo.targetName^1.5\",\"baseInfo.labels^1.5\",\"alarmRules.alarmLevel^1.5\",\"linkTag^1.5\"]," +
         "\"type\":\"cross_fields\",\"operator\":\"OR\",\"slop\":0,\"prefix_length\":0,\"max_expansions\":50,\"zero_terms_query\":\"NONE\",\"auto_generate_synonyms_phrase_query\":true,\"fuzzy_transpositions\":true,\"boost\":1}}],"
   }
   if compareArgs.SearchTime == nil || len(compareArgs.SearchTime)!=2 {
   if compareArgs.SearchTime == nil || len(compareArgs.SearchTime) != 2 {
      return nil
   }
   gteDate := compareArgs.SearchTime[0]
@@ -337,7 +174,7 @@
   //判断布防等级
   alarmLevelStr := ""
   if alarmLevelTypes !="" {
   if alarmLevelTypes != "" {
      alarmLevelStr = "{\"terms\":{\"alarmRules.alarmLevel\":[\"" + alarmLevelTypes + "\"]}},"
   }
@@ -364,6 +201,7 @@
         "\"size\":\"1000\"," +
         "\"query\":{\"bool\":{" + queryStr +
         "\"filter\":[" +
         "{\"term\":{\"targetInfo.targetType.raw\":\"face\"}}," +
         cameraIdStr +
         alarmLevelStr +
         taskIdStr +
@@ -455,57 +293,8 @@
   return capturetable
}
//获取底库人员ID
func GetDbpersonsId(compareArgs  protomsg.CompareArgs,indexName string, serverIp string, serverPort string) (source map[string][]string) {
    queryStr := ""
    queryBody := compareArgs.InputValue
    //检索框
    if queryBody != "" {
        queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"personName^1.5\",\"age^1.5\",\"idCard^1.5\",\"phoneNum^1.5\",\"sex^2.0\",\"reserved^2.0\"]," +
            "\"type\":\"cross_fields\",\"operator\":\"OR\",\"slop\":0,\"prefix_length\":0,\"max_expansions\":50,\"zero_terms_query\":\"NONE\",\"auto_generate_synonyms_phrase_query\":true,\"fuzzy_transpositions\":true,\"boost\":1}}],"
    }
    //判断库表ID
    tableId := compareArgs.Tabs
    esTableId := ""
    esTableIdStr := ""
    if tableId != nil && len(tableId) > 0 {
        esTableId = strings.Replace(strings.Trim(fmt.Sprint(tableId), "[]"), " ", "\",\"", -1)
        esTableIdStr = "{\"terms\":{\"tableId\":[\"" + esTableId + "\"]}}"
    }
    prama := "{" +
        "\"size\":\"100000000\"," +
        "\"query\":{\"bool\":{" + queryStr +
        "\"filter\":[" +
        esTableIdStr +
        "]}}," +
        "\"_source\":[\"id\",\"tableId\"]" +
        "}"
    url := "http://" + serverIp + ":" + serverPort +
        "/" + indexName + "/_search?search_type=dfs_query_then_fetch"
    fmt.Println(url)
    fmt.Println(prama)
    buf, err := EsReq("POST", url,[]byte(prama))
    if err != nil {
        fmt.Println("http request videoUrlInfo info is err!")
        return
    }
    sources, err := Sourcelist(buf)
    if err != nil {
        return
    }
    tabsource := make(map[string][]string)
    for _, source := range  sources{
        tableId := source["tableId"].(string)
        id := source["id"].(string)
        tabsource[tableId] = append(tabsource[tableId], id)
    }
    return tabsource
}
//初始化实时抓拍
func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool   ,quantity int) ([]protomsg.AIOcean, error){
func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool, quantity int) ([]protomsg.AIOcean, error) {
   var aIOceanInfo []protomsg.AIOcean
   url := "http://" + serverIp + ":" + serverPort +
      "/" + indexName + "/_search"
@@ -528,8 +317,8 @@
   },`
   }
   DSLJson := `{
   "size":`+strconv.Itoa(quantity)+`,
   `+queryStr+`
   "size":` + strconv.Itoa(quantity) + `,
   ` + queryStr + `
   "sort":[{"picDate":{"order":"desc"}}],
   "_source": {"includes":[],"excludes":["*.feature"]}
   }`
@@ -549,7 +338,7 @@
}
//实时抓拍
func RealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool   ) ([]protomsg.AIOcean, error){
func RealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool) ([]protomsg.AIOcean, error) {
   var aIOceanInfo []protomsg.AIOcean
   url := "http://" + serverIp + ":" + serverPort +
      "/" + indexName + "/_search"
@@ -578,7 +367,7 @@
                  }      
               }
            },
         `+queryStr+`
         ` + queryStr + `
         ]
      }
   },
@@ -601,7 +390,7 @@
}
//综合统计
func StatisticsComprehensive(serverIp string, serverPort string, indexName string, isAlarm bool) (total int, err error){
func StatisticsComprehensive(serverIp string, serverPort string, indexName string, isAlarm bool) (total int, err error) {
   isAlarmStr := ""
   if isAlarm == true {
      isAlarmStr = `,{"term":{"isAlarm":true}}`
@@ -619,13 +408,13 @@
                  }
               }
            }
            `+isAlarmStr+`
            ` + isAlarmStr + `
            ]
         }
      }
   }`
   //fmt.Println(DSLJson)
   buf, err := EsReq("POST",url,[]byte(DSLJson))
   buf, err := EsReq("POST", url, []byte(DSLJson))
   if err != nil {
      return total, err
   }
@@ -641,13 +430,13 @@
   }
   total = int(middle["total"].(float64))
   //fmt.Println(total)
   return total,nil
   return total, nil
}
//实时报警任务比率
func RealTimeAlarmTaskRate(serverIp string, serverPort string, indexName string) (sources []map[string]interface{},err error){
func RealTimeAlarmTaskRate(serverIp string, serverPort string, indexName string) (sources []map[string]interface{}, err error) {
   url := "http://" + serverIp + ":" + serverPort +
      "/"+indexName+"/_search"
      "/" + indexName + "/_search"
   DSLJson := `{
   "size":0,
   "query":{
@@ -669,7 +458,7 @@
      }
   }
}`
   buf, err := EsReq("POST",url,[]byte(DSLJson))
   buf, err := EsReq("POST", url, []byte(DSLJson))
   if err != nil {
      return nil, err
   }
@@ -688,8 +477,8 @@
      return nil, errors.New("first hits change error!")
   }
   for _, in := range sdkName_status["buckets"].([]interface{}){
      var source = make(map[string]interface{},0)
   for _, in := range sdkName_status["buckets"].([]interface{}) {
      var source = make(map[string]interface{}, 0)
      tmpbuf, ok := in.(map[string]interface{})
      if !ok {
         fmt.Println("change to source error!")
@@ -702,14 +491,13 @@
      sources = append(sources, source)
   }
   //fmt.Println("tmpSource",sources)
   return sources,nil
   return sources, nil
}
//聚合任务列表,taskId+taskName
func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string) (sources []map[string]interface{},err error){
func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string) (sources []map[string]interface{}, err error) {
   url := "http://" + serverIp + ":" + serverPort +
      "/"+indexName+"/_search"
      "/" + indexName + "/_search"
   serverFilterStr := ""
   if analyServerId != "" {
      serverFilterStr = `,
@@ -718,7 +506,7 @@
            "filter": [
            {
            "term": {
            "analyServerId": "`+analyServerId+`"
            "analyServerId": "` + analyServerId + `"
            }
            }
         ]
@@ -750,9 +538,9 @@
            }
        }
    }
   `+serverFilterStr+`
   ` + serverFilterStr + `
}`
   buf, err := EsReq("POST",url,[]byte(DSLJson))
   buf, err := EsReq("POST", url, []byte(DSLJson))
   if err != nil {
      return nil, err
   }
@@ -771,8 +559,8 @@
      return nil, errors.New("first hits change error!")
   }
   for _, in := range task_status["buckets"].([]interface{}){
      var source = make(map[string]interface{},0)
   for _, in := range task_status["buckets"].([]interface{}) {
      var source = make(map[string]interface{}, 0)
      tmpbuf, ok := in.(map[string]interface{})
      if !ok {
         fmt.Println("change to source error!")
@@ -788,20 +576,27 @@
      sources = append(sources, source)
   }
   //fmt.Println("tmpSource",sources)
   return sources,nil
   return sources, nil
}
/****************************************以下为sdkCompare比对缓存使用方法*********************************************/
//获取查询总数
func GetTotal(serverIp string, serverPort string,indexName string,shards string) (total int) {
   JsonDSL := `{"query":{"bool":{"must":[{"match_all":{}}]}},"size":0}`
   url := "http://" + serverIp + ":" + serverPort+ "/" + indexName + "/_search"
   if indexName == "videopersons" {
      url = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
func GetTotal(serverIp string, serverPort string, indexName string, shards string) (total int) {
   JsonDSL := `{
   "size": 0,
   "query": {
      "bool": {
         "filter": [{
            "term": {
               "targetInfo.targetType.raw": "face"
            }
         }]
      }
   }
}`
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
   buf, err := EsReq("POST", url, []byte(JsonDSL))
   if err != nil {
      return
@@ -822,101 +617,85 @@
}
//查询时间段数据
func GetPeriodInfos(serverIp string, serverPort string,startTime string, endTime string, indexName string,shards string) ([]*protomsg.Esinfo, error) {
   var dbinfos []*protomsg.Esinfo
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
   if indexName == "videopersons" {
      url = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
   }
func GetPeriodInfos(serverIp string, serverPort string, startTime string, endTime string, indexName string, shards string) ([]*protomsg.Esinfo, error) {
   var capdbinfo []*protomsg.Esinfo
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
   JsonDSL := `
    {
        "query": {
            "bool": {
                "filter": [{
                    "range": {
                        "picDate": {
                            "gte": "` + startTime + `",
                            "lt": "` + endTime + `"
                        }
                    }
                }]
            }
        },
        "size": 1000000,
        "_source": [
         "id",
         "tableId",
         "faceFeature",
       "analyServerId"
        ]
    }
            {
                "query": {
                    "bool": {
                        "filter": [
                            {
                                "term": {
                                    "targetInfo.targetType.raw": "face"
                                }
                            },
                            {
                                "range": {
                                    "picDate": {
                                        "gte": "` + startTime + `",
                                        "lt": "` + endTime + `"
                                    }
                                }
                            }
                        ]
                    }
                },
                "size": 1000000,
                "_source": [
                    "id",
                    "targetInfo.feature",
                    "analyServerId"
                ]
            }
    `
   //logger.Debug(url)
   //logger.Debug(JsonDSL)
   buf, err := EsReq("POST", url, []byte(JsonDSL))
   if err != nil {
      return dbinfos, errors.New("http request dbtablename info is err!")
      return capdbinfo, errors.New("http request dbtablename info is err!")
   }
   // 返回 _source 数组
   sources, err := Sourcelist(buf)
   if err != nil {
      return dbinfos, err
      return capdbinfo, err
   }
   // 返回所有查询的数据
   dbpersoninfos := Parsesources(sources)
   return dbpersoninfos, nil
   capdbinfos := Parsesources(sources)
   return capdbinfos, nil
}
// 查询底库人员信息
func GetOceanFeatures(serverIp string, serverPort string,queryIndexNum int, queryNums int, indexName string, shards string) ([]*protomsg.Esinfo, error) {
func GetOceanFeatures(serverIp string, serverPort string, queryIndexNum int, queryNums int, indexName string, shards string) ([]*protomsg.Esinfo, error) {
   var dbinfos []*protomsg.Esinfo
   point := strconv.Itoa(queryIndexNum)
   number := strconv.Itoa(queryNums)
   JsonDSL := ""
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
   if indexName == "videopersons" {
      url = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
      JsonDSL = `  {
   url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
   JsonDSL = `  {
                    "from": ` + point + `,        
                    "query": {
                        "match_all": {}
                    },
                  "bool": {
                     "filter": [
                        {
                           "term": {
                              "targetInfo.targetType.raw": "face"
                        }
                           }
                     ]
                  }
               },
                     "size":` + number + `,
                     "_source": [
                       "id",
                       "tableId",
                       "faceFeature",
                  "analyServerId"
                     "id",
                     "targetInfo.feature",
                     "analyServerId"
                       ]
                    }`
   } else { //查底库有效人员
      JsonDSL = `  {
                    "from": ` + point + `,
                    "query": {
                        "bool": {
                           "filter":[{
                                "term":{
                                    "enable":1
                                }
                            },{
                                "term":{
                                    "isDelete":0
                                }
                            }]
                        }
                    },
                     "size":` + number + `,
                     "_source": [
                       "id",
                       "tableId",
                       "faceFeature"
                       ]
                    }`
   }
   buf, err := EsReq("POST", url, []byte(JsonDSL))
   if err != nil {