| | |
| | | package esutil |
| | | |
| | | import ( |
| | | "encoding/json" |
| | | "errors" |
| | | "fmt" |
| | | "strconv" |
| | | "strings" |
| | | "encoding/json" |
| | | "errors" |
| | | "fmt" |
| | | "strconv" |
| | | "strings" |
| | | "sync" |
| | | "time" |
| | | |
| | | "basic.com/pubsub/protomsg.git" |
| | | ) |
| | | |
| | | // 查询底库人员信息 |
| | | func Personinfos( queryIndex int, queryNums int, indexName string, serverIp string, serverPort string, analyServerId string) ([]*protomsg.Esinfo, error){ |
| | | var dbinfos []*protomsg.Esinfo |
| | | point := strconv.Itoa(queryIndex) |
| | | number := strconv.Itoa(queryNums) |
| | | JsonDSL := "" |
| | | if indexName == "videopersons" { |
| | | JsonDSL = ` { |
| | | "from": ` + point + `, |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "term": { |
| | | "analyServerId": "` + analyServerId + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "size":`+ number +`, |
| | | "_source": [ |
| | | "id", |
| | | "faceFeature" |
| | | ] |
| | | }` |
| | | }else { |
| | | JsonDSL = ` { |
| | | "from": ` + point + `, |
| | | "query": { |
| | | "match_all": {} |
| | | }, |
| | | "size":`+ number +`, |
| | | "_source": [ |
| | | "id", |
| | | "tableId", |
| | | "faceFeature" |
| | | ] |
| | | }` |
| | | } |
| | | //fmt.Println("url: "+"http://"+serverIp+":"+serverPort+"/"+indexName+"/_search","body: ",JsonDSL) |
| | | buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(JsonDSL)) |
| | | if err != nil { |
| | | return dbinfos ,errors.New("http request dbtablename info is err!") |
| | | } |
| | | |
| | | // 返回 _source 数组 |
| | | sources, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return dbinfos,err |
| | | } |
| | | |
| | | // 返回所有查询的数据 |
| | | dbpersoninfos := Parsesources(sources) |
| | | return dbpersoninfos, nil |
| | | } |
| | | |
| | | // 根据底库id查询底库信息 |
| | | func Dbtablefosbyid (id []string, indexName string, serverIp string, serverPort string) ([]protomsg.Dbtable, error) { |
| | | var dbinfo []protomsg.Dbtable |
| | | dbtableId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1) |
| | | var dbinfoRequest = ` |
| | | { |
| | | "query": { |
| | | "bool": { |
| | | "filter": [{ |
| | | "terms": { |
| | | "id": [ |
| | | "`+ dbtableId +`" |
| | | ] |
| | | } |
| | | }] |
| | | } |
| | | }, |
| | | "size":1000000 |
| | | } |
| | | ` |
| | | buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfoRequest)) |
| | | if err != nil { |
| | | return dbinfo, err |
| | | } |
| | | |
| | | sources, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return dbinfo , err |
| | | } |
| | | |
| | | dbtable := Dbtablebyid(sources) |
| | | return dbtable, nil |
| | | } |
| | | ) |
| | | |
| | | // 根据抓拍人员id查询抓拍人员信息 |
| | | func Videopersonsinfosbyid (id []string, indexName string, serverIp string, serverPort string) ([]protomsg.Videopersons, error) { |
| | | var videopersonsInfo []protomsg.Videopersons |
| | | func AIOceaninfosbyid(id []string, indexName string, serverIp string, serverPort string) ([]protomsg.AIOcean, error) { |
| | | var aIOceanInfo []protomsg.AIOcean |
| | | videopersonsPersonId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1) |
| | | var dbinfoRequest = ` |
| | | { |
| | |
| | | "filter": [{ |
| | | "terms": { |
| | | "id": [ |
| | | "`+ videopersonsPersonId +`" |
| | | "` + videopersonsPersonId + `" |
| | | ] |
| | | } |
| | | }] |
| | |
| | | "size":1000000 |
| | | } |
| | | ` |
| | | buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfoRequest)) |
| | | buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search", []byte(dbinfoRequest)) |
| | | if err != nil { |
| | | return videopersonsInfo, err |
| | | return aIOceanInfo, err |
| | | } |
| | | |
| | | sources, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return videopersonsInfo , err |
| | | return aIOceanInfo, err |
| | | } |
| | | |
| | | videoperson := Videopersonsbyid(sources) |
| | | println(videoperson) |
| | | return videoperson,nil |
| | | aIOcean := AIOceanAnalysis(sources) |
| | | println(aIOcean) |
| | | return aIOcean, nil |
| | | } |
| | | |
| | | // 根据底库人员id查询底库人员信息 |
| | | func Dbpersoninfosbyid (id []string, indexName string, serverIp string, serverPort string) ([]protomsg.Dbperson, error) { |
| | | var dbinfo []protomsg.Dbperson |
| | | dbtablePersonId := strings.Replace(strings.Trim(fmt.Sprint(id), "[]"), " ", "\",\"", -1) |
| | | var dbinfoRequest = ` |
| | | { |
| | | "query": { |
| | | "bool": { |
| | | "filter": [{ |
| | | "terms": { |
| | | "id": [ |
| | | "`+ dbtablePersonId +`" |
| | | ] |
| | | } |
| | | }] |
| | | } |
| | | }, |
| | | "size":1000000 |
| | | } |
| | | ` |
| | | buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfoRequest)) |
| | | if err != nil { |
| | | return dbinfo, err |
| | | } |
| | | |
| | | sources, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return dbinfo , err |
| | | } |
| | | |
| | | dbperson := Dbpersonbyid(sources) |
| | | println(dbperson) |
| | | return dbperson,nil |
| | | } |
| | | //根据抓拍库人员id查询特征值 |
| | | func GetVideoPersonFaceFeatureById (id string, indexName string, serverIp string, serverPort string) (string, error) { |
| | | var jsonDSL = ` |
| | | func GetVideoPersonFaceFeatureById(id string, indexName string, serverIp string, serverPort string) (string, error) { |
| | | var jsonDSL = ` |
| | | { |
| | | "query": { |
| | | "bool": { |
| | | "filter": [{ |
| | | "term": { |
| | | "id":"`+ id +`" |
| | | "id":"` + id + `" |
| | | } |
| | | }] |
| | | } |
| | | }, |
| | | "_source":["faceFeature"] |
| | | "_source":["targetInfo.feature"] |
| | | } |
| | | ` |
| | | buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(jsonDSL)) |
| | | if err != nil { |
| | | return "", err |
| | | } |
| | | buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search", []byte(jsonDSL)) |
| | | if err != nil { |
| | | return "", err |
| | | } |
| | | |
| | | sources, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return "" , err |
| | | } |
| | | faceFeature := sources[0]["faceFeature"].(string) |
| | | return faceFeature,nil |
| | | sources, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return "", err |
| | | } |
| | | |
| | | feature := sources[0]["targetInfo"].([]interface{})[0].(map[string]interface{})["feature"].(string) |
| | | return feature, nil |
| | | } |
| | | |
| | | // 根据tableid 查询tablename |
| | | func Dbtablename(tableid string, indexName string, serverIp string, serverPort string) (tablename string, err error) { |
| | | var dbinfotable =` { |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "term": { |
| | | "id":"`+tableid+`" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "_source": [ |
| | | "tableName" |
| | | ], |
| | | "size":1000000 |
| | | } |
| | | ` |
| | | buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_search",[]byte(dbinfotable)) |
| | | if err != nil { |
| | | return "" ,errors.New("http request dbtablename info is err!") |
| | | } |
| | | sources, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return "",err |
| | | } |
| | | for _, source := range sources { |
| | | if name, ok := source["tableName"].(string); ok { |
| | | tablename = name |
| | | break |
| | | } |
| | | } |
| | | return tablename, nil |
| | | //根据目标id查询已追加条数 |
| | | func GetLinkTagInfoSize(id string, indexName string, serverIp string, serverPort string) (size int, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | queryDSL := `{ |
| | | "query": { |
| | | "term":{ |
| | | "id":"` + id + `" |
| | | } |
| | | } |
| | | }` |
| | | buf, err := EsReq("POST", url, []byte(queryDSL)) |
| | | if err != nil { |
| | | return -1, err |
| | | } |
| | | source, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return -1, err |
| | | } |
| | | if source[0]["linkTagInfo"] != nil { |
| | | size = len(source[0]["linkTagInfo"].([]interface{})) |
| | | } else { |
| | | return -1, errors.New("该数组不存在") |
| | | } |
| | | return size, nil |
| | | } |
| | | |
| | | //根据目标id追加跟踪信息 |
| | | func AppendTargetInfo(id string, targetInfo string, indexName string, serverIp string, serverPort string, updateTime string) (string, error) { |
| | | if targetInfo == "" { |
| | | return "", errors.New("append data is nil") |
| | | } |
| | | var info interface{} |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query?refresh=true" |
| | | jsonDSL := `{ |
| | | "query": { |
| | | "term":{ |
| | | "id":"` + id + `" |
| | | } |
| | | }, |
| | | "script": { |
| | | "lang": "painless", |
| | | "inline": "ctx._source.linkTagInfo.add(params.newparam);ctx._source.updateTime='` + updateTime + `'", |
| | | "params": { |
| | | "newparam": ` + targetInfo + ` |
| | | } |
| | | } |
| | | }` |
| | | fmt.Println(jsonDSL) |
| | | buf, err := EsReq("POST", url, []byte(jsonDSL)) |
| | | if err != nil { |
| | | return "", err |
| | | } |
| | | json.Unmarshal(buf, &info) |
| | | out, ok := info.(map[string]interface{}) |
| | | fmt.Println(out) |
| | | if !ok { |
| | | return "", errors.New("http response interface can not change map[string]interface{}") |
| | | } |
| | | middle, ok := out["updated"].(float64) |
| | | if !ok { |
| | | return "", errors.New("first updated change error!") |
| | | } |
| | | mes := "" |
| | | if middle == 1 { |
| | | mes = "追加成功" |
| | | } |
| | | if middle == 0 { |
| | | mes = "已经追加" |
| | | } |
| | | return mes, nil |
| | | |
| | | } |
| | | |
| | | //根据抓拍人员id更新(videourl)摄像机地址 |
| | | func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string)(statu int){ |
| | | var info interface{} |
| | | var videoUrlInfo = ` |
| | | func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int, err error) { |
| | | |
| | | var info interface{} |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query?refresh=true" |
| | | sourceStr := "ctx._source.videoUrl='" + videoUrl + "'" |
| | | if command >= 0 { |
| | | sourceStr = "ctx._source.linkTagInfo[" + strconv.Itoa(command) + "].videoUrl='" + videoUrl + "'" |
| | | } |
| | | var videoUrlInfo = ` |
| | | { |
| | | "script": { |
| | | "source": "ctx._source.videoUrl='` + videoUrl + `'" |
| | | "source": "` + sourceStr + `" |
| | | }, |
| | | "query": { |
| | | "term": { |
| | | "id": "` +id+ `" |
| | | "id": "` + id + `" |
| | | } |
| | | }, |
| | | "size":1000000 |
| | | } |
| | | } |
| | | ` |
| | | buf, err := EsReq("POST", "http://"+serverIp+":"+serverPort+"/"+indexName+"/_update_by_query",[]byte(videoUrlInfo)) |
| | | if err != nil { |
| | | fmt.Println("http request videoUrlInfo info is err!") |
| | | statu = 500 |
| | | return |
| | | } |
| | | json.Unmarshal(buf, &info) |
| | | out, ok := info.(map[string]interface{}) |
| | | if !ok { |
| | | fmt.Println("http response interface can not change map[string]interface{}") |
| | | statu = 500 |
| | | return |
| | | } |
| | | |
| | | middle, ok := out["updated"].(float64) |
| | | if !ok { |
| | | fmt.Println("first updated change error!") |
| | | statu = 500 |
| | | return |
| | | } |
| | | if middle == 1{ |
| | | statu = 200 |
| | | return |
| | | } |
| | | if middle == 0{ |
| | | statu = 201 |
| | | return |
| | | } |
| | | return statu |
| | | //fmt.Println("url: ", url, videoUrlInfo) |
| | | buf, err := EsReq("POST", url, []byte(videoUrlInfo)) |
| | | if err != nil { |
| | | fmt.Println("http request videoUrlInfo info is err!") |
| | | statu = 500 |
| | | return statu, err |
| | | } |
| | | json.Unmarshal(buf, &info) |
| | | //fmt.Println(info) |
| | | out, ok := info.(map[string]interface{}) |
| | | if !ok { |
| | | fmt.Println("http response interface can not change map[string]interface{}") |
| | | statu = 500 |
| | | return statu, errors.New("http response interface can not change map[string]interface{}") |
| | | } |
| | | middle, ok := out["updated"].(float64) |
| | | if !ok { |
| | | fmt.Println("first updated change error!") |
| | | statu = 500 |
| | | return statu, errors.New("first updated change error!") |
| | | } |
| | | if middle == 1 { |
| | | statu = 200 |
| | | return statu, nil |
| | | } |
| | | if middle == 0 { |
| | | statu = 201 |
| | | return statu, errors.New("已经修改") |
| | | } |
| | | return statu, nil |
| | | } |
| | | |
| | | //获取当前节点抓拍库所有人员ID |
| | | func GetAllLocalVideopersonsId(compareArgs protomsg.CompareArgs, indexName string, serverIp string, serverPort string) (capturetable []string) { |
| | | //获取当前节点抓拍库所有人员ID*缓存* |
| | | func GetAllLocalVideopersonsId(compareArgs protomsg.CompareArgs, indexName string, serverIp string, serverPort string, alarmLevelTypes string) (capturetable []string) { |
| | | queryStr := "" |
| | | queryBody := compareArgs.InputValue |
| | | //检索框 |
| | | if queryBody != "" { |
| | | queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"alarmRules.alarmLevel^1.5\",\"ageDescription^1.5\",\"taskName^1.5\",\"baseInfo.tableName^1.5\",\"sex^2.0\",\"race^2.0\",\"content^1.0\",\"baseInfo.idCard^1.8\",\"cameraAddr^1.0\"]," + |
| | | queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"cameraAddr^1.5\",\"taskName^1.5\",\"sdkName^1.5\",\"showLabels^3.0\",\"baseInfo.tableName^1.5\",\"baseInfo.targetName^1.5\",\"baseInfo.labels^1.5\",\"alarmRules.alarmLevel^1.5\",\"linkTag^1.5\"]," + |
| | | "\"type\":\"cross_fields\",\"operator\":\"OR\",\"slop\":0,\"prefix_length\":0,\"max_expansions\":50,\"zero_terms_query\":\"NONE\",\"auto_generate_synonyms_phrase_query\":true,\"fuzzy_transpositions\":true,\"boost\":1}}]," |
| | | } |
| | | if compareArgs.SearchTime == nil || len(compareArgs.SearchTime) != 2 { |
| | | return nil |
| | | } |
| | | gteDate := compareArgs.SearchTime[0] |
| | | lteDate := compareArgs.SearchTime[1] |
| | |
| | | isCollectStr = "{\"term\":{\"isCollect\":\"" + isCollect + "\"}}," |
| | | } |
| | | |
| | | //判断布防等级 |
| | | alarmLevelStr := "" |
| | | if alarmLevelTypes != "" { |
| | | alarmLevelStr = "{\"terms\":{\"alarmRules.alarmLevel.raw\":[\"" + alarmLevelTypes + "\"]}}," |
| | | } |
| | | |
| | | //使用es底层机制处理分页 |
| | | |
| | | analyServerFilterStr := "" |
| | | analyServerId := compareArgs.AnalyServerId |
| | | if analyServerId == "" { |
| | | fmt.Println("no analyServerId") |
| | | return |
| | | if analyServerId != "" { |
| | | analyServerFilterStr = "{\"term\":{\"analyServerId\":\"" + analyServerId + "\"}}," |
| | | } |
| | | analyServerFilterStr := "{\"term\":{\"analyServerId\":\"" + analyServerId + "\"}}," |
| | | |
| | | ts := time.Now() |
| | | //首次请求头 |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | | "/" + indexName + "/_search?search_type=dfs_query_then_fetch;scroll=1m" |
| | |
| | | var lock sync.RWMutex |
| | | var wg sync.WaitGroup |
| | | |
| | | for i := 0; i < 32; i++ { |
| | | for i := 0; i < 48; i++ { |
| | | //请求体 |
| | | prama := "{" + |
| | | "\"slice\":{\"id\":" + strconv.Itoa(i) + ",\"max\":48}," + |
| | | "\"size\":\"1000\"," + |
| | | "\"query\":{\"bool\":{" + queryStr + |
| | | "\"filter\":[" + |
| | | "{\"term\":{\"targetInfo.targetType.raw\":\"face\"}}," + |
| | | cameraIdStr + |
| | | alarmLevelStr + |
| | | taskIdStr + |
| | | isCollectStr + |
| | | esTableIdStr + |
| | |
| | | "\"_source\":[\"id\"]" + |
| | | "}" |
| | | wg.Add(1) |
| | | go func() { |
| | | go func(reqParam string) { |
| | | defer wg.Done() |
| | | |
| | | //fmt.Println(url) |
| | | //fmt.Println(prama) |
| | | buf, err := EsReq("POST", url, []byte(prama)) |
| | | buf, err := EsReq("POST", url, []byte(reqParam)) |
| | | |
| | | if err != nil { |
| | | fmt.Println("http request videoUrlInfo info is err!") |
| | |
| | | return |
| | | } |
| | | for _, source := range sources["sourcelist"].([]map[string]interface{}) { |
| | | lock.Lock() |
| | | capturetable = append(capturetable, source["id"].(string)) |
| | | lock.Unlock() |
| | | } |
| | | |
| | | scroll_id := sources["scroll_id"].(string) |
| | |
| | | next_scroll_id = nextSources["scroll_id"].(string) |
| | | } |
| | | |
| | | |
| | | fmt.Println(len(capturetable)) |
| | | |
| | | }() |
| | | }(prama) |
| | | } |
| | | wg.Wait() |
| | | |
| | | //fmt.Println("lenth_all: ", len(capturetable)) |
| | | //fmt.Println("耗时:", time.Since(ts)) |
| | | fmt.Println("lenth_all: ", len(capturetable)) |
| | | fmt.Println("耗时:", time.Since(ts)) |
| | | return capturetable |
| | | } |
| | | |
| | | //获取底库人员ID |
| | | func GetDbpersonsId(compareArgs protomsg.CompareArgs,indexName string, serverIp string, serverPort string) (source map[string][]string) { |
| | | queryStr := "" |
| | | queryBody := compareArgs.InputValue |
| | | //检索框 |
| | | if queryBody != "" { |
| | | queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"personName^1.5\",\"age^1.5\",\"idCard^1.5\",\"phoneNum^1.5\",\"sex^2.0\",\"reserved^2.0\"]," + |
| | | "\"type\":\"cross_fields\",\"operator\":\"OR\",\"slop\":0,\"prefix_length\":0,\"max_expansions\":50,\"zero_terms_query\":\"NONE\",\"auto_generate_synonyms_phrase_query\":true,\"fuzzy_transpositions\":true,\"boost\":1}}]," |
| | | } |
| | | //判断库表ID |
| | | tableId := compareArgs.Tabs |
| | | esTableId := "" |
| | | esTableIdStr := "" |
| | | if tableId != nil && len(tableId) > 0 { |
| | | esTableId = strings.Replace(strings.Trim(fmt.Sprint(tableId), "[]"), " ", "\",\"", -1) |
| | | esTableIdStr = "{\"terms\":{\"tableId\":[\"" + esTableId + "\"]}}" |
| | | } |
| | | |
| | | prama := "{" + |
| | | "\"size\":\"100000000\"," + |
| | | "\"query\":{\"bool\":{" + queryStr + |
| | | "\"filter\":[" + |
| | | esTableIdStr + |
| | | "]}}," + |
| | | "\"_source\":[\"id\",\"tableId\"]" + |
| | | "}" |
| | | |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | | "/" + indexName + "/_search?search_type=dfs_query_then_fetch" |
| | | fmt.Println(url) |
| | | fmt.Println(prama) |
| | | buf, err := EsReq("POST", url,[]byte(prama)) |
| | | if err != nil { |
| | | fmt.Println("http request videoUrlInfo info is err!") |
| | | return |
| | | } |
| | | sources, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return |
| | | } |
| | | tabsource := make(map[string][]string) |
| | | for _, source := range sources{ |
| | | tableId := source["tableId"].(string) |
| | | id := source["id"].(string) |
| | | tabsource[tableId] = append(tabsource[tableId], id) |
| | | } |
| | | return tabsource |
| | | } |
| | | |
| | | //初始化实时抓拍 |
| | | func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool ,quantity int) ([]protomsg.Videopersons, error){ |
| | | var videopersonsInfo []protomsg.Videopersons |
| | | func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm string, category string, quantity int) ([]protomsg.AIOcean, error) { |
| | | var aIOceanInfo []protomsg.AIOcean |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | | "/" + indexName + "/_search" |
| | | queryStr := "" |
| | | if isAlarm == true { |
| | | queryStr = `"query":{ |
| | | "match_all":{} |
| | | },` |
| | | } else { |
| | | queryStr = `"query":{ |
| | | var filterArr []string |
| | | if isAlarm != "all" { |
| | | filterArr = append(filterArr, ` { |
| | | "term":{ |
| | | "isAlarm":"`+isAlarm+`" |
| | | } |
| | | }`) |
| | | } |
| | | |
| | | if category != "all" { |
| | | filterArr = append(filterArr, ` { |
| | | "term":{ |
| | | "targetInfo.targetType":"`+category+`" |
| | | } |
| | | }`) |
| | | |
| | | } |
| | | |
| | | queryStr := `"query":{ |
| | | "bool":{ |
| | | "filter":[ |
| | | { |
| | | "term":{ |
| | | "isAlarm":1 |
| | | } |
| | | } |
| | | ` + strings.Join(filterArr, ",") + ` |
| | | ] |
| | | } |
| | | },` |
| | | } |
| | | |
| | | DSLJson := `{ |
| | | "size":`+strconv.Itoa(quantity)+`, |
| | | `+queryStr+` |
| | | "size":` + strconv.Itoa(quantity) + `, |
| | | ` + queryStr + ` |
| | | "sort":[{"picDate":{"order":"desc"}}], |
| | | "_source": ["baseInfo", "alarmRules", "sex", "analyServerName", "sdkName", "ageDescription", "content", "id", "cameraAddr", "picMaxUrl", "picDate", "race", "videoUrl", "picSmUrl", "taskName", "personIsHub", "isAlarm", "analyServerIp", "cameraId"] |
| | | "_source": {"includes":[],"excludes":["*.feature"]} |
| | | }` |
| | | fmt.Println(DSLJson) |
| | | buf, err := EsReq("POST", url, []byte(DSLJson)) |
| | | if err != nil { |
| | | return videopersonsInfo, err |
| | | return aIOceanInfo, err |
| | | } |
| | | |
| | | sources, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return videopersonsInfo, err |
| | | return aIOceanInfo, err |
| | | } |
| | | |
| | | videoperson := Videopersonsbyid(sources) |
| | | aIOcean := AIOceanAnalysis(sources) |
| | | //fmt.Println(len(videoperson)) |
| | | return videoperson, nil |
| | | return aIOcean, nil |
| | | } |
| | | |
| | | //实时抓拍 |
| | | func RealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool ) ([]protomsg.Videopersons, error){ |
| | | var videopersonsInfo []protomsg.Videopersons |
| | | func RealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool) ([]protomsg.AIOcean, error) { |
| | | var aIOceanInfo []protomsg.AIOcean |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | | "/" + indexName + "/_search" |
| | | queryStr := "" |
| | | if isAlarm == true { |
| | | fmt.Println("continue") |
| | | } else { |
| | | queryStr = ` |
| | | { |
| | | "term":{ |
| | | "isAlarm":1 |
| | | } |
| | | } |
| | | ` |
| | | } |
| | | DSLJson := `{ |
| | | "size":20, |
| | | "query":{ |
| | |
| | | } |
| | | } |
| | | }, |
| | | `+queryStr+` |
| | | { |
| | | "term":{ |
| | | "isAlarm":` + strconv.FormatBool(isAlarm) + ` |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "_source": ["baseInfo", "alarmRules", "sex", "analyServerName", "sdkName", "ageDescription", "content", "id", "cameraAddr", "picMaxUrl", "picDate", "race", "videoUrl", "picSmUrl", "taskName", "personIsHub", "isAlarm", "analyServerIp", "cameraId", "isAckAlarm"] |
| | | "_source": {"includes":[],"excludes":["*.feature"]} |
| | | }` |
| | | |
| | | buf, err := EsReq("POST", url, []byte(DSLJson)) |
| | | if err != nil { |
| | | return videopersonsInfo, err |
| | | return aIOceanInfo, err |
| | | } |
| | | |
| | | sources, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return videopersonsInfo, err |
| | | return aIOceanInfo, err |
| | | } |
| | | |
| | | videoperson := Videopersonsbyid(sources) |
| | | fmt.Println(len(videoperson)) |
| | | return videoperson, nil |
| | | aIOcean := AIOceanAnalysis(sources) |
| | | fmt.Println(len(aIOcean)) |
| | | return aIOcean, nil |
| | | } |
| | | |
| | | //综合统计 |
| | | func StatisticsComprehensive(serverIp string, serverPort string, indexName string) (total int, err error){ |
| | | func StatisticsComprehensive(serverIp string, serverPort string, indexName string, isAlarm string) (total int, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | | "/" + indexName + "/_search" |
| | | isAlarmStr := "" |
| | | if isAlarm != "all" { |
| | | isAlarmStr = ` { |
| | | "term":{ |
| | | "isAlarm":"` + isAlarm + `" |
| | | } |
| | | },` |
| | | |
| | | } |
| | | DSLJson := `{ |
| | | "size":0, |
| | | "query":{ |
| | | "bool":{ |
| | | "filter":[{ |
| | | "filter":[ |
| | | ` + isAlarmStr + ` |
| | | { |
| | | "range":{ |
| | | "picDate":{ |
| | | "gte":"now+8H/d" |
| | | "gte":"now+8h/d" |
| | | } |
| | | } |
| | | }] |
| | | } |
| | | ] |
| | | } |
| | | } |
| | | }` |
| | | buf, err := EsReq("POST",url,[]byte(DSLJson)) |
| | | //fmt.Println(DSLJson) |
| | | buf, err := EsReq("POST", url, []byte(DSLJson)) |
| | | if err != nil { |
| | | return total, err |
| | | } |
| | |
| | | } |
| | | total = int(middle["total"].(float64)) |
| | | //fmt.Println(total) |
| | | return total,nil |
| | | return total, nil |
| | | } |
| | | |
| | | //实时报警任务比率 |
| | | func RealTimeAlarmTaskRate(serverIp string, serverPort string, indexName string) (sources []map[string]interface{},err error){ |
| | | func RealTimeAlarmTaskRate(serverIp string, serverPort string, indexName string) (sources []map[string]interface{}, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | | "/"+indexName+"/_search" |
| | | "/" + indexName + "/_search" |
| | | DSLJson := `{ |
| | | "size":0, |
| | | "query":{ |
| | |
| | | "aggs":{ |
| | | "sdkName_status":{ |
| | | "terms":{ |
| | | "field":"taskName.raw" |
| | | "field":"sdkName.raw" |
| | | } |
| | | } |
| | | } |
| | | }` |
| | | buf, err := EsReq("POST",url,[]byte(DSLJson)) |
| | | buf, err := EsReq("POST", url, []byte(DSLJson)) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | |
| | | return nil, errors.New("first hits change error!") |
| | | } |
| | | |
| | | for _, in := range sdkName_status["buckets"].([]interface{}){ |
| | | var source = make(map[string]interface{},0) |
| | | for _, in := range sdkName_status["buckets"].([]interface{}) { |
| | | var source = make(map[string]interface{}, 0) |
| | | tmpbuf, ok := in.(map[string]interface{}) |
| | | if !ok { |
| | | fmt.Println("change to source error!") |
| | |
| | | sources = append(sources, source) |
| | | } |
| | | //fmt.Println("tmpSource",sources) |
| | | return sources,nil |
| | | return sources, nil |
| | | } |
| | | |
| | | //聚合任务列表,taskId+taskName |
| | | func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string) (sources []map[string]interface{}, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | | "/" + indexName + "/_search" |
| | | serverFilterStr := "" |
| | | if analyServerId != "" { |
| | | serverFilterStr = `, |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "term": { |
| | | "analyServerId": "` + analyServerId + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }` |
| | | } |
| | | DSLJson := `{ |
| | | "size": 0, |
| | | "aggs": { |
| | | "task_status": { |
| | | "composite": { |
| | | "sources": [ |
| | | { |
| | | "taskId": { |
| | | "terms": { |
| | | "field": "taskId" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "taskName": { |
| | | "terms": { |
| | | "field": "taskName.raw" |
| | | } |
| | | } |
| | | } |
| | | ], |
| | | "size":"1000000" |
| | | } |
| | | } |
| | | } |
| | | ` + serverFilterStr + ` |
| | | }` |
| | | buf, err := EsReq("POST", url, []byte(DSLJson)) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | var info interface{} |
| | | json.Unmarshal(buf, &info) |
| | | out, ok := info.(map[string]interface{}) |
| | | if !ok { |
| | | return nil, errors.New("http response interface can not change map[string]interface{}") |
| | | } |
| | | middle, ok := out["aggregations"].(map[string]interface{}) |
| | | if !ok { |
| | | return nil, errors.New("first hits change error!") |
| | | } |
| | | task_status, ok := middle["task_status"].(map[string]interface{}) |
| | | if !ok { |
| | | return nil, errors.New("first hits change error!") |
| | | } |
| | | |
| | | for _, in := range task_status["buckets"].([]interface{}) { |
| | | var source = make(map[string]interface{}, 0) |
| | | tmpbuf, ok := in.(map[string]interface{}) |
| | | if !ok { |
| | | fmt.Println("change to source error!") |
| | | continue |
| | | } |
| | | task := tmpbuf["key"].(map[string]interface{}) |
| | | count := int(tmpbuf["doc_count"].(float64)) |
| | | taskName := task["taskName"].(string) |
| | | taskId := task["taskId"].(string) |
| | | source["taskName"] = taskName |
| | | source["taskId"] = taskId |
| | | source["count"] = count |
| | | sources = append(sources, source) |
| | | } |
| | | //fmt.Println("tmpSource",sources) |
| | | return sources, nil |
| | | |
| | | } |
| | | |
| | | //添加即将删除信号 |
| | | func AddDeleteSignal() { |
| | | |
| | | } |
| | | |
| | | /****************************************以下为sdkCompare比对缓存使用方法*********************************************/ |
| | | //获取查询总数 *缓存* |
| | | func GetTotal(serverIp string, serverPort string, indexName string, shards string, targetType string) (total int) { |
| | | JsonDSL := `{ |
| | | "size": 0, |
| | | "query": { |
| | | "bool": { |
| | | "filter": [{ |
| | | "term": { |
| | | "targetInfo.targetType.raw": "` + targetType + `" |
| | | } |
| | | }] |
| | | } |
| | | } |
| | | }` |
| | | |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local" |
| | | buf, err := EsReq("POST", url, []byte(JsonDSL)) |
| | | if err != nil { |
| | | return |
| | | } |
| | | var info interface{} |
| | | json.Unmarshal(buf, &info) |
| | | out, ok := info.(map[string]interface{}) |
| | | if !ok { |
| | | return |
| | | } |
| | | middle, ok := out["hits"].(map[string]interface{}) |
| | | if !ok { |
| | | return |
| | | } |
| | | total = int(middle["total"].(float64)) |
| | | return total |
| | | |
| | | } |
| | | |
| | | //查询时间段数据 *缓存* |
| | | func GetPeriodInfos(serverIp string, serverPort string, startTime string, endTime string, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) { |
| | | var capdbinfo []*protomsg.MultiFeaCache |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local" |
| | | var source []string |
| | | switch targetType { |
| | | case "face": |
| | | source = []string{"id", "targetInfo.feature", "analyServerId", "cameraId"} |
| | | case "track": |
| | | source = []string{"id", "targetInfo.feature", "analyServerId", "cameraId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.cameraId", "linkTagInfo.targetInfo.targetLocation"} |
| | | } |
| | | JsonDSL := ` |
| | | { |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "term": { |
| | | "targetInfo.targetType.raw": "` + targetType + `" |
| | | } |
| | | }, |
| | | { |
| | | "range": { |
| | | "picDate": { |
| | | "gte": "` + startTime + `", |
| | | "lt": "` + endTime + `" |
| | | } |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "size": 1000000, |
| | | "_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"] |
| | | } |
| | | ` |
| | | //logger.Debug(url) |
| | | //logger.Debug(JsonDSL) |
| | | //fmt.Println(JsonDSL) |
| | | buf, err := EsReq("POST", url, []byte(JsonDSL)) |
| | | if err != nil { |
| | | return capdbinfo, errors.New("http request dbtablename info is err!") |
| | | } |
| | | |
| | | // 返回 _source 数组 |
| | | sources, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return capdbinfo, err |
| | | } |
| | | //fmt.Println(sources) |
| | | // 返回所有查询的数据 |
| | | capdbinfos := Parsesources(sources) |
| | | return capdbinfos, nil |
| | | } |
| | | |
| | | // 查询底库人员信息*缓存* |
| | | func GetOceanFeatures(serverIp string, serverPort string, queryIndexNum int, queryNums int, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) { |
| | | var dbinfos []*protomsg.MultiFeaCache |
| | | point := strconv.Itoa(queryIndexNum) |
| | | number := strconv.Itoa(queryNums) |
| | | JsonDSL := "" |
| | | var source []string |
| | | switch targetType { |
| | | case "face": |
| | | source = []string{"id", "targetInfo.feature", "analyServerId"} |
| | | case "track": |
| | | source = []string{"id", "targetInfo.feature", "analyServerId", "targetInfo.attachTarget.feature", "targetInfo.targetLocation", "linkTagInfo.targetInfo.feature", "linkTagInfo.targetInfo.attachTarget.feature", "linkTagInfo.targetInfo.targetLocation"} |
| | | } |
| | | |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local" |
| | | JsonDSL = ` { |
| | | "from": ` + point + `, |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "term": { |
| | | "targetInfo.targetType.raw": "` + targetType + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "size":` + number + `, |
| | | "_source": ["` + strings.Replace(strings.Trim(fmt.Sprint(source), "[]"), " ", "\",\"", -1) + `"] |
| | | }` |
| | | |
| | | buf, err := EsReq("POST", url, []byte(JsonDSL)) |
| | | if err != nil { |
| | | return dbinfos, errors.New("http request dbtablename info is err!") |
| | | } |
| | | |
| | | // 返回 _source 数组 |
| | | sources, err := Sourcelist(buf) |
| | | if err != nil { |
| | | return dbinfos, err |
| | | } |
| | | |
| | | // 返回所有查询的数据 |
| | | dbpersoninfos := Parsesources(sources) |
| | | return dbpersoninfos, nil |
| | | } |
| | | |
| | | //************************CORN TASK******************************* |
| | | //查询日期范围内是否还存在数据 |
| | | func QueryAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | deleteJson := `{ |
| | | "query":{ |
| | | "bool":{ |
| | | "filter":[{ |
| | | "range":{ |
| | | "updateTime":{ |
| | | "gte":"` + startTime + `", |
| | | "lte":"` + endTime + `" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "term":{ |
| | | "analyServerId":"` + analyServerId + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | } |
| | | } ` |
| | | buf, err := EsReq("POST", url, []byte(deleteJson)) |
| | | if err != nil { |
| | | return false, errors.New("请求失败") |
| | | } |
| | | resTotal, err := SourceTotal(buf) |
| | | if err != nil { |
| | | return false, errors.New("解码失败") |
| | | } |
| | | if resTotal == -1 || resTotal == 0{ |
| | | result = false |
| | | } else { |
| | | result = true |
| | | } |
| | | return result, nil |
| | | } |
| | | |
| | | |
| | | //按日期范围,服务器Id删除数据 |
| | | func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_delete_by_query" |
| | | deleteJson := `{ |
| | | "query":{ |
| | | "bool":{ |
| | | "filter":[{ |
| | | "range":{ |
| | | "updateTime":{ |
| | | "gte":"` + startTime + `", |
| | | "lte":"` + endTime + `" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "term":{ |
| | | "analyServerId":"` + analyServerId + `" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | } |
| | | } ` |
| | | buf, err := EsReq("POST", url, []byte(deleteJson)) |
| | | if err != nil { |
| | | return false, errors.New("请求失败") |
| | | } |
| | | deleteRes, err := SourceDeleted(buf) |
| | | if err != nil { |
| | | return false, errors.New("解码失败") |
| | | } |
| | | if deleteRes == -1 { |
| | | result = false |
| | | } else { |
| | | result = true |
| | | } |
| | | return result, nil |
| | | } |
| | | |
| | | //给所有节点追加删除任务信息 |
| | | func AddDelTask(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query" |
| | | addJson := `{ |
| | | "script": { |
| | | "lang":"painless", |
| | | "inline": "ctx._source.instantTask.add(params.newtask)", |
| | | "params": { |
| | | "newtask": { |
| | | "instantClearId": "` + analyServerId + `", |
| | | "startTime": "` + startTime + `", |
| | | "endTime": "` + endTime + `" |
| | | } |
| | | } |
| | | }, |
| | | "query": { |
| | | "match_all": {} |
| | | } |
| | | }` |
| | | buf, err := EsReq("POST", url, []byte(addJson)) |
| | | if err != nil { |
| | | return false, errors.New("请求失败") |
| | | } |
| | | updateRes, err := SourceUpdated(buf) |
| | | if err != nil { |
| | | return false, errors.New("解码失败") |
| | | } |
| | | if updateRes == -1 { |
| | | result = false |
| | | } else { |
| | | result = true |
| | | } |
| | | return result, nil |
| | | } |
| | | |
| | | //移除已执行完的删除任务 |
| | | func DeleteDelTask(serverIp string, serverPort string, indexName string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query" |
| | | deleteJson := `{ |
| | | "script": { |
| | | "lang":"painless", |
| | | "inline": "ctx._source.instantTask.remove(0)" |
| | | }, |
| | | "query": { |
| | | "bool": { |
| | | "filter":[{ |
| | | "term":{ |
| | | "id":"` + analyServerId + `" |
| | | } |
| | | }] |
| | | } |
| | | } |
| | | }` |
| | | buf, err := EsReq("POST", url, []byte(deleteJson)) |
| | | if err != nil { |
| | | return false, errors.New("请求失败") |
| | | } |
| | | updateRes, err := SourceUpdated(buf) |
| | | if err != nil { |
| | | return false, errors.New("解码失败") |
| | | } |
| | | if updateRes == -1 { |
| | | result = false |
| | | } else { |
| | | result = true |
| | | } |
| | | return result, nil |
| | | } |