package esutil import ( "encoding/json" "errors" "fmt" "strconv" "strings" "sync" "time" "basic.com/pubsub/protomsg.git" ) // 查询底库人员信息 func Personinfos( queryIndex int, queryNums int, indexName string, serverIp string, serverPort string, analyServerId string) ([]*protomsg.Esinfo, error){ var dbinfos []*protomsg.Esinfo point := strconv.Itoa(queryIndex) number := strconv.Itoa(queryNums) JsonDSL := "" if indexName == "videopersons" { JsonDSL = ` { "from": ` + point + `, "query": { "bool": { "filter": [ { "term": { "analyServerId": "` + analyServerId + `" } } ] } }, "size":`+ number +`, "_source": [ "id", "faceFeature" ] }` }else { JsonDSL = ` { "from": ` + point + `, "query": { "match_all": {} }, "size":`+ number +`, "_source": [ "id", "tableId", "faceFeature" ] }` } //fmt.Println("url: "+"http://"+serverIp+":"+serverPort+"/"+indexName+"/_search","body: ",JsonDSL) buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(JsonDSL)) if err != nil { return dbinfos ,errors.New("http request dbtablename info is err!") } // 返回 _source 数组 sources, err := Sourcelist(buf) if err != nil { return dbinfos,err } // 返回所有查询的数据 dbpersoninfos := Parsesources(sources) return dbpersoninfos, nil } // 根据底库id查询底库信息 func Dbtablefosbyid (id []string, indexName string, serverIp string, serverPort string) ([]protomsg.Dbtable, error) { var dbinfo []protomsg.Dbtable dbtableId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1) var dbinfoRequest = ` { "query": { "bool": { "filter": [{ "terms": { "id": [ "`+ dbtableId +`" ] } }] } }, "size":1000000 } ` buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfoRequest)) if err != nil { return dbinfo, err } sources, err := Sourcelist(buf) if err != nil { return dbinfo , err } dbtable := Dbtablebyid(sources) return dbtable, nil } // 根据抓拍人员id查询抓拍人员信息 func AIOceaninfosbyid (id []string, indexName string, serverIp string, serverPort string) ([]protomsg.AIOcean, error) { var aIOceanInfo []protomsg.AIOcean videopersonsPersonId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1) var dbinfoRequest = ` { "query": { "bool": { "filter": [{ "terms": { "id": [ "`+ videopersonsPersonId +`" ] } }] } }, "size":1000000 } ` buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfoRequest)) if err != nil { return aIOceanInfo, err } sources, err := Sourcelist(buf) if err != nil { return aIOceanInfo , err } aIOcean := AIOceanAnalysis(sources) println(aIOcean) return aIOcean,nil } // 根据底库人员id查询底库人员信息 func Dbpersoninfosbyid (id []string, indexName string, serverIp string, serverPort string) ([]protomsg.Dbperson, error) { var dbinfo []protomsg.Dbperson dbtablePersonId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1) var dbinfoRequest = ` { "query": { "bool": { "filter": [{ "terms": { "id": [ "`+ dbtablePersonId +`" ] } }] } }, "size":1000000 } ` buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfoRequest)) if err != nil { return dbinfo, err } sources, err := Sourcelist(buf) if err != nil { return dbinfo , err } dbperson := Dbpersonbyid(sources) println(dbperson) return dbperson,nil } //根据抓拍库人员id查询特征值 func GetVideoPersonFaceFeatureById (id string, indexName string, serverIp string, serverPort string) (string, error) { var jsonDSL = ` { "query": { "bool": { "filter": [{ "term": { "id":"`+ id +`" } }] } }, "_source":["faceFeature"] } ` buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(jsonDSL)) if err != nil { return "", err } sources, err := Sourcelist(buf) if err != nil { return "" , err } faceFeature := sources[0]["faceFeature"].(string) return faceFeature,nil } // 根据tableid 查询tablename func Dbtablename(tableid string, indexName string, serverIp string, serverPort string) (tablename string, err error) { var dbinfotable =` { "query": { "bool": { "filter": [ { "term": { "id":"`+tableid+`" } } ] } }, "_source": [ "tableName" ], "size":1000000 } ` buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfotable)) if err != nil { return "" ,errors.New("http request dbtablename info is err!") } sources, err := Sourcelist(buf) if err != nil { return "",err } for _, source := range sources { if name, ok := source["tableName"].(string); ok { tablename = name break } } return tablename, nil } //根据抓拍人员id更新(videourl)摄像机地址 func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string,command int) (statu int) { var info interface{} url := "http://"+serverIp+":"+serverPort+"/"+indexName+"/_update_by_query?refresh=true" sourceStr := "ctx._source.videoUrl='" + videoUrl + "'" if command>=0 { sourceStr = "ctx._source.linkTagInfo["+strconv.Itoa(command)+"].videoUrl='" + videoUrl + "'" } var videoUrlInfo = ` { "script": { "source": "`+ sourceStr + `" }, "query": { "term": { "id": "` + id + `" } } } ` //fmt.Println("url: ", url, videoUrlInfo) buf, err := EsReq("POST", url, []byte(videoUrlInfo)) if err != nil { fmt.Println("http request videoUrlInfo info is err!") statu = 500 return } json.Unmarshal(buf, &info) //fmt.Println(info) out, ok := info.(map[string]interface{}) if !ok { fmt.Println("http response interface can not change map[string]interface{}") statu = 500 return } middle, ok := out["updated"].(float64) if !ok { fmt.Println("first updated change error!") statu = 500 return } if middle == 1 { statu = 200 return } if middle == 0 { statu = 201 return } return statu } //获取当前节点抓拍库所有人员ID func GetAllLocalVideopersonsId(compareArgs protomsg.CompareArgs, indexName string, serverIp string, serverPort string, alarmLevelTypes string) (capturetable []string) { queryStr := "" queryBody := compareArgs.InputValue //检索框 if queryBody != "" { queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"alarmRules.alarmLevel^1.5\",\"ageDescription^1.5\",\"taskName^1.5\",\"baseInfo.tableName^1.5\",\"sex^2.0\",\"race^2.0\",\"content^1.0\",\"baseInfo.idCard^1.8\",\"cameraAddr^1.0\"]," + "\"type\":\"cross_fields\",\"operator\":\"OR\",\"slop\":0,\"prefix_length\":0,\"max_expansions\":50,\"zero_terms_query\":\"NONE\",\"auto_generate_synonyms_phrase_query\":true,\"fuzzy_transpositions\":true,\"boost\":1}}]," } if compareArgs.SearchTime == nil || len(compareArgs.SearchTime)!=2 { return nil } gteDate := compareArgs.SearchTime[0] lteDate := compareArgs.SearchTime[1] //判断任务ID taskIdStr := "" taskId := compareArgs.Tasks if taskId != nil && len(taskId) > 0 { esTaskId := strings.Replace(strings.Trim(fmt.Sprint(taskId), "[]"), " ", "\",\"", -1) taskIdStr = "{\"terms\":{\"taskId\":[\"" + esTaskId + "\"]}}," } //判断摄像机ID cameraIdStr := "" cameraId := compareArgs.TreeNodes if cameraId != nil && len(cameraId) > 0 { esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1) cameraIdStr = "{\"terms\":{\"cameraId\":[\"" + esCameraId + "\"]}}," } //判断库表ID tableId := compareArgs.Tabs esTableId := "" esTableIdStr := "" if tableId != nil && len(tableId) > 0 { esTableId = strings.Replace(strings.Trim(fmt.Sprint(tableId), "[]"), " ", "\",\"", -1) esTableIdStr = "{\"terms\":{\"baseInfo.tableId\":[\"" + esTableId + "\"]}}," } isCollectStr := "" isCollect := compareArgs.Collection if isCollect != "" { isCollectStr = "{\"term\":{\"isCollect\":\"" + isCollect + "\"}}," } //判断布防等级 alarmLevelStr := "" if alarmLevelTypes !="" { alarmLevelStr = "{\"terms\":{\"alarmRules.alarmLevel\":[\"" + alarmLevelTypes + "\"]}}," } //使用es底层机制处理分页 analyServerFilterStr := "" analyServerId := compareArgs.AnalyServerId if analyServerId != "" { analyServerFilterStr = "{\"term\":{\"analyServerId\":\"" + analyServerId + "\"}}," } ts := time.Now() //首次请求头 url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?search_type=dfs_query_then_fetch;scroll=1m" var lock sync.RWMutex var wg sync.WaitGroup for i := 0; i < 48; i++ { //请求体 prama := "{" + "\"slice\":{\"id\":" + strconv.Itoa(i) + ",\"max\":48}," + "\"size\":\"1000\"," + "\"query\":{\"bool\":{" + queryStr + "\"filter\":[" + cameraIdStr + alarmLevelStr + taskIdStr + isCollectStr + esTableIdStr + analyServerFilterStr + "{\"range\":{\"picDate\":{\"from\":\"" + gteDate + "\",\"to\":\"" + lteDate + "\",\"include_lower\":true,\"include_upper\":true,\"boost\":1}}}]}}," + "\"_source\":[\"id\"]" + "}" wg.Add(1) go func(reqParam string) { defer wg.Done() //fmt.Println(url) //fmt.Println(prama) buf, err := EsReq("POST", url, []byte(reqParam)) if err != nil { fmt.Println("http request videoUrlInfo info is err!") fmt.Println(len(capturetable)) return } sources, err := Sourcelistforscroll(buf) if err != nil { fmt.Println(len(capturetable)) return } for _, source := range sources["sourcelist"].([]map[string]interface{}) { lock.Lock() capturetable = append(capturetable, source["id"].(string)) lock.Unlock() } scroll_id := sources["scroll_id"].(string) //scroll请求头 scroll_url := "http://" + serverIp + ":" + serverPort + "/_search/scroll" for { var tmpList []string next_scroll_id := "" if next_scroll_id != "" { scroll_id = next_scroll_id } jsonDSL := `{ "scroll": "1m", "scroll_id" : "` + scroll_id + `" }` //fmt.Println(scroll_url) //fmt.Println(jsonDSL) buf, err := EsReq("POST", scroll_url, []byte(jsonDSL)) if err != nil { fmt.Println("lenth1: ", len(capturetable)) return } nextSources, err := Sourcelistforscroll(buf) if nextSources == nil { return } nextM := nextSources["sourcelist"].([]map[string]interface{}) //fmt.Println("id",nextSources) if nextM == nil || len(nextM) == 0 { //fmt.Println("lenth: ", len(capturetable)) return } //fmt.Println("id") for _, source := range nextM { tmpList = append(tmpList, source["id"].(string)) } //fmt.Println("tmpList: ", len(tmpList)) lock.Lock() capturetable = append(capturetable, tmpList...) lock.Unlock() next_scroll_id = nextSources["scroll_id"].(string) } }(prama) } wg.Wait() fmt.Println("lenth_all: ", len(capturetable)) fmt.Println("耗时:", time.Since(ts)) return capturetable } //获取底库人员ID func GetDbpersonsId(compareArgs protomsg.CompareArgs,indexName string, serverIp string, serverPort string) (source map[string][]string) { queryStr := "" queryBody := compareArgs.InputValue //检索框 if queryBody != "" { queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"personName^1.5\",\"age^1.5\",\"idCard^1.5\",\"phoneNum^1.5\",\"sex^2.0\",\"reserved^2.0\"]," + "\"type\":\"cross_fields\",\"operator\":\"OR\",\"slop\":0,\"prefix_length\":0,\"max_expansions\":50,\"zero_terms_query\":\"NONE\",\"auto_generate_synonyms_phrase_query\":true,\"fuzzy_transpositions\":true,\"boost\":1}}]," } //判断库表ID tableId := compareArgs.Tabs esTableId := "" esTableIdStr := "" if tableId != nil && len(tableId) > 0 { esTableId = strings.Replace(strings.Trim(fmt.Sprint(tableId), "[]"), " ", "\",\"", -1) esTableIdStr = "{\"terms\":{\"tableId\":[\"" + esTableId + "\"]}}" } prama := "{" + "\"size\":\"100000000\"," + "\"query\":{\"bool\":{" + queryStr + "\"filter\":[" + esTableIdStr + "]}}," + "\"_source\":[\"id\",\"tableId\"]" + "}" url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?search_type=dfs_query_then_fetch" fmt.Println(url) fmt.Println(prama) buf, err := EsReq("POST", url,[]byte(prama)) if err != nil { fmt.Println("http request videoUrlInfo info is err!") return } sources, err := Sourcelist(buf) if err != nil { return } tabsource := make(map[string][]string) for _, source := range sources{ tableId := source["tableId"].(string) id := source["id"].(string) tabsource[tableId] = append(tabsource[tableId], id) } return tabsource } //初始化实时抓拍 func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool ,quantity int) ([]protomsg.AIOcean, error){ var aIOceanInfo []protomsg.AIOcean url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" queryStr := "" if isAlarm == true { queryStr = `"query":{ "match_all":{} },` } else { queryStr = `"query":{ "bool":{ "filter":[ { "term":{ "isAlarm":true } } ] } },` } DSLJson := `{ "size":`+strconv.Itoa(quantity)+`, `+queryStr+` "sort":[{"picDate":{"order":"desc"}}], "_source": {"includes":[],"excludes":["*.feature"]} }` buf, err := EsReq("POST", url, []byte(DSLJson)) if err != nil { return aIOceanInfo, err } sources, err := Sourcelist(buf) if err != nil { return aIOceanInfo, err } aIOcean := AIOceanAnalysis(sources) //fmt.Println(len(videoperson)) return aIOcean, nil } //实时抓拍 func RealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool ) ([]protomsg.AIOcean, error){ var aIOceanInfo []protomsg.AIOcean url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" queryStr := "" if isAlarm == true { fmt.Println("continue") } else { queryStr = ` { "term":{ "isAlarm":1 } } ` } DSLJson := `{ "size":20, "query":{ "bool":{ "filter":[ { "range":{ "picDate":{ "gte":"now+8h-30s", "lt":"now+8h" } } }, `+queryStr+` ] } }, "_source": ["baseInfo", "alarmRules", "sex", "analyServerName", "sdkName", "ageDescription", "content", "id", "cameraAddr", "picMaxUrl", "picDate", "race", "videoUrl", "picSmUrl", "taskName", "personIsHub", "isAlarm", "analyServerIp", "cameraId", "isAckAlarm"] }` buf, err := EsReq("POST", url, []byte(DSLJson)) if err != nil { return aIOceanInfo, err } sources, err := Sourcelist(buf) if err != nil { return aIOceanInfo, err } aIOcean := AIOceanAnalysis(sources) fmt.Println(len(aIOcean)) return aIOcean, nil } //综合统计 func StatisticsComprehensive(serverIp string, serverPort string, indexName string, isAlarm bool) (total int, err error){ isAlarmStr := "" if isAlarm == true { isAlarmStr = `,{"term":{"isAlarm":true}}` } url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" DSLJson := `{ "size":0, "query":{ "bool":{ "filter":[{ "range":{ "picDate":{ "gte":"now+8h/d" } } } `+isAlarmStr+` ] } } }` //fmt.Println(DSLJson) buf, err := EsReq("POST",url,[]byte(DSLJson)) if err != nil { return total, err } var info interface{} json.Unmarshal(buf, &info) out, ok := info.(map[string]interface{}) if !ok { return total, errors.New("http response interface can not change map[string]interface{}") } middle, ok := out["hits"].(map[string]interface{}) if !ok { return total, errors.New("first hits change error!") } total = int(middle["total"].(float64)) //fmt.Println(total) return total,nil } //实时报警任务比率 func RealTimeAlarmTaskRate(serverIp string, serverPort string, indexName string) (sources []map[string]interface{},err error){ url := "http://" + serverIp + ":" + serverPort + "/"+indexName+"/_search" DSLJson := `{ "size":0, "query":{ "bool":{ "filter":[{ "range":{ "picDate":{ "gte":"now+8h/d" } } }] } }, "aggs":{ "sdkId_status":{ "terms":{ "field":"taskId" } } } }` buf, err := EsReq("POST",url,[]byte(DSLJson)) if err != nil { return nil, err } var info interface{} json.Unmarshal(buf, &info) out, ok := info.(map[string]interface{}) if !ok { return nil, errors.New("http response interface can not change map[string]interface{}") } middle, ok := out["aggregations"].(map[string]interface{}) if !ok { return nil, errors.New("first hits change error!") } sdkName_status, ok := middle["sdkId_status"].(map[string]interface{}) if !ok { return nil, errors.New("first hits change error!") } for _, in := range sdkName_status["buckets"].([]interface{}){ var source = make(map[string]interface{},0) tmpbuf, ok := in.(map[string]interface{}) if !ok { fmt.Println("change to source error!") continue } sdkId := tmpbuf["key"].(string) count := int(tmpbuf["doc_count"].(float64)) source["id"] = sdkId source["value"] = count sources = append(sources, source) } //fmt.Println("tmpSource",sources) return sources,nil } //聚合任务列表,taskId+taskName func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string) (sources []map[string]interface{},err error){ url := "http://" + serverIp + ":" + serverPort + "/"+indexName+"/_search" serverFilterStr := "" if analyServerId != "" { serverFilterStr = `, "query": { "bool": { "filter": [ { "term": { "analyServerId": "`+analyServerId+`" } } ] } }` } DSLJson := `{ "size": 0, "aggs": { "task_status": { "composite": { "sources": [ { "taskId": { "terms": { "field": "taskId" } } }, { "taskName": { "terms": { "field": "taskName.raw" } } } ], "size":"1000000" } } } `+serverFilterStr+` }` buf, err := EsReq("POST",url,[]byte(DSLJson)) if err != nil { return nil, err } var info interface{} json.Unmarshal(buf, &info) out, ok := info.(map[string]interface{}) if !ok { return nil, errors.New("http response interface can not change map[string]interface{}") } middle, ok := out["aggregations"].(map[string]interface{}) if !ok { return nil, errors.New("first hits change error!") } task_status, ok := middle["task_status"].(map[string]interface{}) if !ok { return nil, errors.New("first hits change error!") } for _, in := range task_status["buckets"].([]interface{}){ var source = make(map[string]interface{},0) tmpbuf, ok := in.(map[string]interface{}) if !ok { fmt.Println("change to source error!") continue } task := tmpbuf["key"].(map[string]interface{}) count := int(tmpbuf["doc_count"].(float64)) taskName := task["taskName"].(string) taskId := task["taskId"].(string) source["taskName"] = taskName source["taskId"] = taskId source["count"] = count sources = append(sources, source) } //fmt.Println("tmpSource",sources) return sources,nil }