package esutil
|
|
import (
|
"encoding/json"
|
"errors"
|
"fmt"
|
"strconv"
|
"strings"
|
"sync"
|
"time"
|
|
"basic.com/pubsub/protomsg.git"
|
)
|
|
// 查询底库人员信息
|
func Personinfos( queryIndex int, queryNums int, indexName string, serverIp string, serverPort string, analyServerId string) ([]*protomsg.Esinfo, error){
|
var dbinfos []*protomsg.Esinfo
|
point := strconv.Itoa(queryIndex)
|
number := strconv.Itoa(queryNums)
|
JsonDSL := ""
|
if indexName == "videopersons" {
|
JsonDSL = ` {
|
"from": ` + point + `,
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"term": {
|
"analyServerId": "` + analyServerId + `"
|
}
|
}
|
]
|
}
|
},
|
"size":`+ number +`,
|
"_source": [
|
"id",
|
"faceFeature"
|
]
|
}`
|
}else {
|
JsonDSL = ` {
|
"from": ` + point + `,
|
"query": {
|
"match_all": {}
|
},
|
"size":`+ number +`,
|
"_source": [
|
"id",
|
"tableId",
|
"faceFeature"
|
]
|
}`
|
}
|
//fmt.Println("url: "+"http://"+serverIp+":"+serverPort+"/"+indexName+"/_search","body: ",JsonDSL)
|
buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(JsonDSL))
|
if err != nil {
|
return dbinfos ,errors.New("http request dbtablename info is err!")
|
}
|
|
// 返回 _source 数组
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return dbinfos,err
|
}
|
|
// 返回所有查询的数据
|
dbpersoninfos := Parsesources(sources)
|
return dbpersoninfos, nil
|
}
|
|
// 根据底库id查询底库信息
|
func Dbtablefosbyid (id []string, indexName string, serverIp string, serverPort string) ([]protomsg.Dbtable, error) {
|
var dbinfo []protomsg.Dbtable
|
dbtableId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1)
|
var dbinfoRequest = `
|
{
|
"query": {
|
"bool": {
|
"filter": [{
|
"terms": {
|
"id": [
|
"`+ dbtableId +`"
|
]
|
}
|
}]
|
}
|
},
|
"size":1000000
|
}
|
`
|
buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfoRequest))
|
if err != nil {
|
return dbinfo, err
|
}
|
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return dbinfo , err
|
}
|
|
dbtable := Dbtablebyid(sources)
|
return dbtable, nil
|
}
|
|
// 根据抓拍人员id查询抓拍人员信息
|
func AIOceaninfosbyid (id []string, indexName string, serverIp string, serverPort string) ([]protomsg.AIOcean, error) {
|
var aIOceanInfo []protomsg.AIOcean
|
videopersonsPersonId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1)
|
var dbinfoRequest = `
|
{
|
"query": {
|
"bool": {
|
"filter": [{
|
"terms": {
|
"id": [
|
"`+ videopersonsPersonId +`"
|
]
|
}
|
}]
|
}
|
},
|
"size":1000000
|
}
|
`
|
buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfoRequest))
|
if err != nil {
|
return aIOceanInfo, err
|
}
|
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return aIOceanInfo , err
|
}
|
|
aIOcean := AIOceanAnalysis(sources)
|
println(aIOcean)
|
return aIOcean,nil
|
}
|
|
// 根据底库人员id查询底库人员信息
|
func Dbpersoninfosbyid (id []string, indexName string, serverIp string, serverPort string) ([]protomsg.Dbperson, error) {
|
var dbinfo []protomsg.Dbperson
|
dbtablePersonId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1)
|
var dbinfoRequest = `
|
{
|
"query": {
|
"bool": {
|
"filter": [{
|
"terms": {
|
"id": [
|
"`+ dbtablePersonId +`"
|
]
|
}
|
}]
|
}
|
},
|
"size":1000000
|
}
|
`
|
buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfoRequest))
|
if err != nil {
|
return dbinfo, err
|
}
|
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return dbinfo , err
|
}
|
|
dbperson := Dbpersonbyid(sources)
|
println(dbperson)
|
return dbperson,nil
|
}
|
//根据抓拍库人员id查询特征值
|
func GetVideoPersonFaceFeatureById (id string, indexName string, serverIp string, serverPort string) (string, error) {
|
var jsonDSL = `
|
{
|
"query": {
|
"bool": {
|
"filter": [{
|
"term": {
|
"id":"`+ id +`"
|
}
|
}]
|
}
|
},
|
"_source":["faceFeature"]
|
}
|
`
|
buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(jsonDSL))
|
if err != nil {
|
return "", err
|
}
|
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return "" , err
|
}
|
faceFeature := sources[0]["faceFeature"].(string)
|
return faceFeature,nil
|
}
|
|
// 根据tableid 查询tablename
|
func Dbtablename(tableid string, indexName string, serverIp string, serverPort string) (tablename string, err error) {
|
var dbinfotable =` {
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"term": {
|
"id":"`+tableid+`"
|
}
|
}
|
]
|
}
|
},
|
"_source": [
|
"tableName"
|
],
|
"size":1000000
|
}
|
`
|
buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfotable))
|
if err != nil {
|
return "" ,errors.New("http request dbtablename info is err!")
|
}
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return "",err
|
}
|
for _, source := range sources {
|
if name, ok := source["tableName"].(string); ok {
|
tablename = name
|
break
|
}
|
}
|
return tablename, nil
|
}
|
|
|
//根据抓拍人员id更新(videourl)摄像机地址
|
func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string,command int) (statu int) {
|
|
var info interface{}
|
url := "http://"+serverIp+":"+serverPort+"/"+indexName+"/_update_by_query?refresh=true"
|
sourceStr := "ctx._source.videoUrl='" + videoUrl + "'"
|
if command>=0 {
|
sourceStr = "ctx._source.linkTagInfo["+strconv.Itoa(command)+"].videoUrl='" + videoUrl + "'"
|
}
|
var videoUrlInfo = `
|
{
|
"script": {
|
"source": "`+ sourceStr + `"
|
},
|
"query": {
|
"term": {
|
"id": "` + id + `"
|
}
|
}
|
}
|
`
|
//fmt.Println("url: ", url, videoUrlInfo)
|
buf, err := EsReq("POST", url, []byte(videoUrlInfo))
|
if err != nil {
|
fmt.Println("http request videoUrlInfo info is err!")
|
statu = 500
|
return
|
}
|
json.Unmarshal(buf, &info)
|
//fmt.Println(info)
|
out, ok := info.(map[string]interface{})
|
if !ok {
|
fmt.Println("http response interface can not change map[string]interface{}")
|
statu = 500
|
return
|
}
|
middle, ok := out["updated"].(float64)
|
if !ok {
|
fmt.Println("first updated change error!")
|
statu = 500
|
return
|
}
|
if middle == 1 {
|
statu = 200
|
return
|
}
|
if middle == 0 {
|
statu = 201
|
return
|
}
|
return statu
|
}
|
|
//获取当前节点抓拍库所有人员ID
|
func GetAllLocalVideopersonsId(compareArgs protomsg.CompareArgs, indexName string, serverIp string, serverPort string, alarmLevelTypes string) (capturetable []string) {
|
queryStr := ""
|
queryBody := compareArgs.InputValue
|
//检索框
|
if queryBody != "" {
|
queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"alarmRules.alarmLevel^1.5\",\"ageDescription^1.5\",\"taskName^1.5\",\"baseInfo.tableName^1.5\",\"sex^2.0\",\"race^2.0\",\"content^1.0\",\"baseInfo.idCard^1.8\",\"cameraAddr^1.0\"]," +
|
"\"type\":\"cross_fields\",\"operator\":\"OR\",\"slop\":0,\"prefix_length\":0,\"max_expansions\":50,\"zero_terms_query\":\"NONE\",\"auto_generate_synonyms_phrase_query\":true,\"fuzzy_transpositions\":true,\"boost\":1}}],"
|
}
|
if compareArgs.SearchTime == nil || len(compareArgs.SearchTime)!=2 {
|
return nil
|
}
|
gteDate := compareArgs.SearchTime[0]
|
lteDate := compareArgs.SearchTime[1]
|
//判断任务ID
|
taskIdStr := ""
|
taskId := compareArgs.Tasks
|
if taskId != nil && len(taskId) > 0 {
|
esTaskId := strings.Replace(strings.Trim(fmt.Sprint(taskId), "[]"), " ", "\",\"", -1)
|
taskIdStr = "{\"terms\":{\"taskId\":[\"" + esTaskId + "\"]}},"
|
}
|
//判断摄像机ID
|
cameraIdStr := ""
|
cameraId := compareArgs.TreeNodes
|
if cameraId != nil && len(cameraId) > 0 {
|
esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1)
|
cameraIdStr = "{\"terms\":{\"cameraId\":[\"" + esCameraId + "\"]}},"
|
}
|
|
//判断库表ID
|
tableId := compareArgs.Tabs
|
esTableId := ""
|
esTableIdStr := ""
|
if tableId != nil && len(tableId) > 0 {
|
esTableId = strings.Replace(strings.Trim(fmt.Sprint(tableId), "[]"), " ", "\",\"", -1)
|
esTableIdStr = "{\"terms\":{\"baseInfo.tableId\":[\"" + esTableId + "\"]}},"
|
}
|
isCollectStr := ""
|
isCollect := compareArgs.Collection
|
if isCollect != "" {
|
isCollectStr = "{\"term\":{\"isCollect\":\"" + isCollect + "\"}},"
|
}
|
|
//判断布防等级
|
alarmLevelStr := ""
|
if alarmLevelTypes !="" {
|
alarmLevelStr = "{\"terms\":{\"alarmRules.alarmLevel\":[\"" + alarmLevelTypes + "\"]}},"
|
}
|
|
//使用es底层机制处理分页
|
|
analyServerFilterStr := ""
|
analyServerId := compareArgs.AnalyServerId
|
if analyServerId != "" {
|
analyServerFilterStr = "{\"term\":{\"analyServerId\":\"" + analyServerId + "\"}},"
|
}
|
|
ts := time.Now()
|
//首次请求头
|
url := "http://" + serverIp + ":" + serverPort +
|
"/" + indexName + "/_search?search_type=dfs_query_then_fetch;scroll=1m"
|
|
var lock sync.RWMutex
|
var wg sync.WaitGroup
|
|
for i := 0; i < 48; i++ {
|
//请求体
|
prama := "{" +
|
"\"slice\":{\"id\":" + strconv.Itoa(i) + ",\"max\":48}," +
|
"\"size\":\"1000\"," +
|
"\"query\":{\"bool\":{" + queryStr +
|
"\"filter\":[" +
|
cameraIdStr +
|
alarmLevelStr +
|
taskIdStr +
|
isCollectStr +
|
esTableIdStr +
|
analyServerFilterStr +
|
"{\"range\":{\"picDate\":{\"from\":\"" + gteDate + "\",\"to\":\"" + lteDate + "\",\"include_lower\":true,\"include_upper\":true,\"boost\":1}}}]}}," +
|
"\"_source\":[\"id\"]" +
|
"}"
|
wg.Add(1)
|
go func(reqParam string) {
|
defer wg.Done()
|
|
//fmt.Println(url)
|
//fmt.Println(prama)
|
buf, err := EsReq("POST", url, []byte(reqParam))
|
|
if err != nil {
|
fmt.Println("http request videoUrlInfo info is err!")
|
fmt.Println(len(capturetable))
|
return
|
}
|
|
sources, err := Sourcelistforscroll(buf)
|
|
if err != nil {
|
fmt.Println(len(capturetable))
|
return
|
}
|
for _, source := range sources["sourcelist"].([]map[string]interface{}) {
|
lock.Lock()
|
capturetable = append(capturetable, source["id"].(string))
|
lock.Unlock()
|
}
|
|
scroll_id := sources["scroll_id"].(string)
|
|
//scroll请求头
|
scroll_url := "http://" + serverIp + ":" + serverPort + "/_search/scroll"
|
|
for {
|
var tmpList []string
|
next_scroll_id := ""
|
if next_scroll_id != "" {
|
scroll_id = next_scroll_id
|
}
|
jsonDSL := `{
|
"scroll": "1m",
|
"scroll_id" : "` + scroll_id + `"
|
}`
|
//fmt.Println(scroll_url)
|
//fmt.Println(jsonDSL)
|
buf, err := EsReq("POST", scroll_url, []byte(jsonDSL))
|
|
if err != nil {
|
fmt.Println("lenth1: ", len(capturetable))
|
return
|
}
|
nextSources, err := Sourcelistforscroll(buf)
|
|
if nextSources == nil {
|
return
|
}
|
|
nextM := nextSources["sourcelist"].([]map[string]interface{})
|
//fmt.Println("id",nextSources)
|
if nextM == nil || len(nextM) == 0 {
|
//fmt.Println("lenth: ", len(capturetable))
|
return
|
}
|
//fmt.Println("id")
|
for _, source := range nextM {
|
tmpList = append(tmpList, source["id"].(string))
|
}
|
//fmt.Println("tmpList: ", len(tmpList))
|
lock.Lock()
|
capturetable = append(capturetable, tmpList...)
|
lock.Unlock()
|
|
next_scroll_id = nextSources["scroll_id"].(string)
|
}
|
|
}(prama)
|
}
|
wg.Wait()
|
|
fmt.Println("lenth_all: ", len(capturetable))
|
fmt.Println("耗时:", time.Since(ts))
|
return capturetable
|
}
|
|
//获取底库人员ID
|
func GetDbpersonsId(compareArgs protomsg.CompareArgs,indexName string, serverIp string, serverPort string) (source map[string][]string) {
|
queryStr := ""
|
queryBody := compareArgs.InputValue
|
//检索框
|
if queryBody != "" {
|
queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"personName^1.5\",\"age^1.5\",\"idCard^1.5\",\"phoneNum^1.5\",\"sex^2.0\",\"reserved^2.0\"]," +
|
"\"type\":\"cross_fields\",\"operator\":\"OR\",\"slop\":0,\"prefix_length\":0,\"max_expansions\":50,\"zero_terms_query\":\"NONE\",\"auto_generate_synonyms_phrase_query\":true,\"fuzzy_transpositions\":true,\"boost\":1}}],"
|
}
|
//判断库表ID
|
tableId := compareArgs.Tabs
|
esTableId := ""
|
esTableIdStr := ""
|
if tableId != nil && len(tableId) > 0 {
|
esTableId = strings.Replace(strings.Trim(fmt.Sprint(tableId), "[]"), " ", "\",\"", -1)
|
esTableIdStr = "{\"terms\":{\"tableId\":[\"" + esTableId + "\"]}}"
|
}
|
|
prama := "{" +
|
"\"size\":\"100000000\"," +
|
"\"query\":{\"bool\":{" + queryStr +
|
"\"filter\":[" +
|
esTableIdStr +
|
"]}}," +
|
"\"_source\":[\"id\",\"tableId\"]" +
|
"}"
|
|
url := "http://" + serverIp + ":" + serverPort +
|
"/" + indexName + "/_search?search_type=dfs_query_then_fetch"
|
fmt.Println(url)
|
fmt.Println(prama)
|
buf, err := EsReq("POST", url,[]byte(prama))
|
if err != nil {
|
fmt.Println("http request videoUrlInfo info is err!")
|
return
|
}
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return
|
}
|
tabsource := make(map[string][]string)
|
for _, source := range sources{
|
tableId := source["tableId"].(string)
|
id := source["id"].(string)
|
tabsource[tableId] = append(tabsource[tableId], id)
|
}
|
return tabsource
|
}
|
|
//初始化实时抓拍
|
func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool ,quantity int) ([]protomsg.AIOcean, error){
|
var aIOceanInfo []protomsg.AIOcean
|
url := "http://" + serverIp + ":" + serverPort +
|
"/" + indexName + "/_search"
|
queryStr := ""
|
if isAlarm == true {
|
queryStr = `"query":{
|
"match_all":{}
|
},`
|
} else {
|
queryStr = `"query":{
|
"bool":{
|
"filter":[
|
{
|
"term":{
|
"isAlarm":true
|
}
|
}
|
]
|
}
|
},`
|
}
|
DSLJson := `{
|
"size":`+strconv.Itoa(quantity)+`,
|
`+queryStr+`
|
"sort":[{"picDate":{"order":"desc"}}],
|
"_source": {"includes":[],"excludes":["*.feature"]}
|
}`
|
buf, err := EsReq("POST", url, []byte(DSLJson))
|
if err != nil {
|
return aIOceanInfo, err
|
}
|
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return aIOceanInfo, err
|
}
|
|
aIOcean := AIOceanAnalysis(sources)
|
//fmt.Println(len(videoperson))
|
return aIOcean, nil
|
}
|
|
//实时抓拍
|
func RealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool ) ([]protomsg.AIOcean, error){
|
var aIOceanInfo []protomsg.AIOcean
|
url := "http://" + serverIp + ":" + serverPort +
|
"/" + indexName + "/_search"
|
queryStr := ""
|
if isAlarm == true {
|
fmt.Println("continue")
|
} else {
|
queryStr = `
|
{
|
"term":{
|
"isAlarm":1
|
}
|
}
|
`
|
}
|
DSLJson := `{
|
"size":20,
|
"query":{
|
"bool":{
|
"filter":[
|
{
|
"range":{
|
"picDate":{
|
"gte":"now+8h-30s",
|
"lt":"now+8h"
|
}
|
}
|
},
|
`+queryStr+`
|
]
|
}
|
},
|
"_source": ["baseInfo", "alarmRules", "sex", "analyServerName", "sdkName", "ageDescription", "content", "id", "cameraAddr", "picMaxUrl", "picDate", "race", "videoUrl", "picSmUrl", "taskName", "personIsHub", "isAlarm", "analyServerIp", "cameraId", "isAckAlarm"]
|
}`
|
|
buf, err := EsReq("POST", url, []byte(DSLJson))
|
if err != nil {
|
return aIOceanInfo, err
|
}
|
|
sources, err := Sourcelist(buf)
|
if err != nil {
|
return aIOceanInfo, err
|
}
|
|
aIOcean := AIOceanAnalysis(sources)
|
fmt.Println(len(aIOcean))
|
return aIOcean, nil
|
}
|
|
//综合统计
|
func StatisticsComprehensive(serverIp string, serverPort string, indexName string, isAlarm bool) (total int, err error){
|
isAlarmStr := ""
|
if isAlarm == true {
|
isAlarmStr = `,{"term":{"isAlarm":true}}`
|
}
|
url := "http://" + serverIp + ":" + serverPort +
|
"/" + indexName + "/_search"
|
DSLJson := `{
|
"size":0,
|
"query":{
|
"bool":{
|
"filter":[{
|
"range":{
|
"picDate":{
|
"gte":"now+8h/d"
|
}
|
}
|
}
|
`+isAlarmStr+`
|
]
|
}
|
}
|
}`
|
//fmt.Println(DSLJson)
|
buf, err := EsReq("POST",url,[]byte(DSLJson))
|
if err != nil {
|
return total, err
|
}
|
var info interface{}
|
json.Unmarshal(buf, &info)
|
out, ok := info.(map[string]interface{})
|
if !ok {
|
return total, errors.New("http response interface can not change map[string]interface{}")
|
}
|
middle, ok := out["hits"].(map[string]interface{})
|
if !ok {
|
return total, errors.New("first hits change error!")
|
}
|
total = int(middle["total"].(float64))
|
//fmt.Println(total)
|
return total,nil
|
}
|
|
//实时报警任务比率
|
func RealTimeAlarmTaskRate(serverIp string, serverPort string, indexName string) (sources []map[string]interface{},err error){
|
url := "http://" + serverIp + ":" + serverPort +
|
"/"+indexName+"/_search"
|
DSLJson := `{
|
"size":0,
|
"query":{
|
"bool":{
|
"filter":[{
|
"range":{
|
"picDate":{
|
"gte":"now+8h/d"
|
}
|
}
|
}]
|
}
|
},
|
"aggs":{
|
"sdkId_status":{
|
"terms":{
|
"field":"taskId"
|
}
|
}
|
}
|
}`
|
buf, err := EsReq("POST",url,[]byte(DSLJson))
|
if err != nil {
|
return nil, err
|
}
|
var info interface{}
|
json.Unmarshal(buf, &info)
|
out, ok := info.(map[string]interface{})
|
if !ok {
|
return nil, errors.New("http response interface can not change map[string]interface{}")
|
}
|
middle, ok := out["aggregations"].(map[string]interface{})
|
if !ok {
|
return nil, errors.New("first hits change error!")
|
}
|
sdkName_status, ok := middle["sdkId_status"].(map[string]interface{})
|
if !ok {
|
return nil, errors.New("first hits change error!")
|
}
|
|
for _, in := range sdkName_status["buckets"].([]interface{}){
|
var source = make(map[string]interface{},0)
|
tmpbuf, ok := in.(map[string]interface{})
|
if !ok {
|
fmt.Println("change to source error!")
|
continue
|
}
|
sdkId := tmpbuf["key"].(string)
|
count := int(tmpbuf["doc_count"].(float64))
|
source["id"] = sdkId
|
source["value"] = count
|
sources = append(sources, source)
|
}
|
//fmt.Println("tmpSource",sources)
|
return sources,nil
|
}
|
|
|
//聚合任务列表,taskId+taskName
|
func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string) (sources []map[string]interface{},err error){
|
url := "http://" + serverIp + ":" + serverPort +
|
"/"+indexName+"/_search"
|
serverFilterStr := ""
|
if analyServerId != "" {
|
serverFilterStr = `,
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"term": {
|
"analyServerId": "`+analyServerId+`"
|
}
|
}
|
]
|
}
|
}`
|
}
|
DSLJson := `{
|
"size": 0,
|
"aggs": {
|
"task_status": {
|
"composite": {
|
"sources": [
|
{
|
"taskId": {
|
"terms": {
|
"field": "taskId"
|
}
|
}
|
},
|
{
|
"taskName": {
|
"terms": {
|
"field": "taskName.raw"
|
}
|
}
|
}
|
],
|
"size":"1000000"
|
}
|
}
|
}
|
`+serverFilterStr+`
|
}`
|
buf, err := EsReq("POST",url,[]byte(DSLJson))
|
if err != nil {
|
return nil, err
|
}
|
var info interface{}
|
json.Unmarshal(buf, &info)
|
out, ok := info.(map[string]interface{})
|
if !ok {
|
return nil, errors.New("http response interface can not change map[string]interface{}")
|
}
|
middle, ok := out["aggregations"].(map[string]interface{})
|
if !ok {
|
return nil, errors.New("first hits change error!")
|
}
|
task_status, ok := middle["task_status"].(map[string]interface{})
|
if !ok {
|
return nil, errors.New("first hits change error!")
|
}
|
|
for _, in := range task_status["buckets"].([]interface{}){
|
var source = make(map[string]interface{},0)
|
tmpbuf, ok := in.(map[string]interface{})
|
if !ok {
|
fmt.Println("change to source error!")
|
continue
|
}
|
task := tmpbuf["key"].(map[string]interface{})
|
count := int(tmpbuf["doc_count"].(float64))
|
taskName := task["taskName"].(string)
|
taskId := task["taskId"].(string)
|
source["taskName"] = taskName
|
source["taskId"] = taskId
|
source["count"] = count
|
sources = append(sources, source)
|
}
|
//fmt.Println("tmpSource",sources)
|
return sources,nil
|
|
}
|