sunty
2019-09-25 9969f8072b23fd8d03cb38445a81ae881bd01ee2
optimization get personId function
2个文件已修改
262 ■■■■ 已修改文件
EsApi.go 223 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
EsClient.go 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
EsApi.go
@@ -285,87 +285,158 @@
}
//获取当前节点抓拍库所有人员ID
func GetAllLocalVideopersonsId(compareArgs  protomsg.CompareArgs,indexName string, serverIp string, serverPort string) (capturetable []string) {
    queryStr := ""
    queryBody := compareArgs.InputValue
    //检索框
    if queryBody != "" {
        queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"alarmRules.alarmLevel^1.5\",\"ageDescription^1.5\",\"taskName^1.5\",\"baseInfo.tableName^1.5\",\"sex^2.0\",\"race^2.0\",\"content^1.0\",\"baseInfo.idCard^1.8\",\"cameraAddr^1.0\"]," +
            "\"type\":\"cross_fields\",\"operator\":\"OR\",\"slop\":0,\"prefix_length\":0,\"max_expansions\":50,\"zero_terms_query\":\"NONE\",\"auto_generate_synonyms_phrase_query\":true,\"fuzzy_transpositions\":true,\"boost\":1}}],"
    }
    gteDate := compareArgs.SearchTime[0]
    lteDate := compareArgs.SearchTime[1]
    //判断任务ID
    taskIdStr := ""
    taskId := compareArgs.Tasks
    if taskId != nil && len(taskId) > 0 {
        esTaskId := strings.Replace(strings.Trim(fmt.Sprint(taskId), "[]"), " ", "\",\"", -1)
        taskIdStr = "{\"terms\":{\"taskId\":[\"" + esTaskId + "\"]}},"
    }
    //判断摄像机ID
    cameraIdStr := ""
    cameraId := compareArgs.TreeNodes
    if cameraId != nil && len(cameraId) > 0 {
        esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1)
        cameraIdStr = "{\"terms\":{\"cameraId\":[\"" + esCameraId + "\"]}},"
    }
func GetAllLocalVideopersonsId(compareArgs protomsg.CompareArgs, indexName string, serverIp string, serverPort string) (capturetable []string) {
    ts := time.Now()
    queryStr := ""
    queryBody := compareArgs.InputValue
    //检索框
    if queryBody != "" {
        queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"alarmRules.alarmLevel^1.5\",\"ageDescription^1.5\",\"taskName^1.5\",\"baseInfo.tableName^1.5\",\"sex^2.0\",\"race^2.0\",\"content^1.0\",\"baseInfo.idCard^1.8\",\"cameraAddr^1.0\"]," +
            "\"type\":\"cross_fields\",\"operator\":\"OR\",\"slop\":0,\"prefix_length\":0,\"max_expansions\":50,\"zero_terms_query\":\"NONE\",\"auto_generate_synonyms_phrase_query\":true,\"fuzzy_transpositions\":true,\"boost\":1}}],"
    }
    gteDate := compareArgs.SearchTime[0]
    lteDate := compareArgs.SearchTime[1]
    //判断任务ID
    taskIdStr := ""
    taskId := compareArgs.Tasks
    if taskId != nil && len(taskId) > 0 {
        esTaskId := strings.Replace(strings.Trim(fmt.Sprint(taskId), "[]"), " ", "\",\"", -1)
        taskIdStr = "{\"terms\":{\"taskId\":[\"" + esTaskId + "\"]}},"
    }
    //判断摄像机ID
    cameraIdStr := ""
    cameraId := compareArgs.TreeNodes
    if cameraId != nil && len(cameraId) > 0 {
        esCameraId := strings.Replace(strings.Trim(fmt.Sprint(cameraId), "[]"), " ", "\",\"", -1)
        cameraIdStr = "{\"terms\":{\"cameraId\":[\"" + esCameraId + "\"]}},"
    }
    //判断库表ID
    tableId := compareArgs.Tabs
    esTableId := ""
    esTableIdStr := ""
    if tableId != nil && len(tableId) > 0 {
        esTableId = strings.Replace(strings.Trim(fmt.Sprint(tableId), "[]"), " ", "\",\"", -1)
        esTableIdStr = "{\"terms\":{\"baseInfo.tableId\":[\"" + esTableId + "\"]}},"
    }
    isCollectStr := ""
    isCollect := compareArgs.Collection
    if isCollect != "" {
        isCollectStr = "{\"term\":{\"isCollect\":\"" + isCollect + "\"}},"
    }
    //判断库表ID
    tableId := compareArgs.Tabs
    esTableId := ""
    esTableIdStr := ""
    if tableId != nil && len(tableId) > 0 {
        esTableId = strings.Replace(strings.Trim(fmt.Sprint(tableId), "[]"), " ", "\",\"", -1)
        esTableIdStr = "{\"terms\":{\"baseInfo.tableId\":[\"" + esTableId + "\"]}},"
    }
    isCollectStr := ""
    isCollect := compareArgs.Collection
    if isCollect != "" {
        isCollectStr = "{\"term\":{\"isCollect\":\"" + isCollect + "\"}},"
    }
    //使用es底层机制处理分页
    //使用es底层机制处理分页
    analyServerId := compareArgs.AnalyServerId
    if analyServerId == "" {
        fmt.Println("no analyServerId")
        return
    }
    analyServerFilterStr := "{\"term\":{\"analyServerId\":\"" + analyServerId + "\"}},"
    //首次请求头
    url := "http://" + serverIp + ":" + serverPort +
        "/" + indexName + "/_search?search_type=dfs_query_then_fetch;scroll=1m"
    var lock sync.RWMutex
    var wg sync.WaitGroup
    for i := 0; i < 32; i++ {
        //请求体
        prama := "{" +
            "\"slice\":{\"id\":" + strconv.Itoa(i) + ",\"max\":48}," +
            "\"size\":\"1000\"," +
            "\"query\":{\"bool\":{" + queryStr +
            "\"filter\":[" +
            cameraIdStr +
            taskIdStr +
            isCollectStr +
            esTableIdStr +
            analyServerFilterStr +
            "{\"range\":{\"picDate\":{\"from\":\"" + gteDate + "\",\"to\":\"" + lteDate + "\",\"include_lower\":true,\"include_upper\":true,\"boost\":1}}}]}}," +
            "\"_source\":[\"id\"]" +
            "}"
        wg.Add(1)
        go func() {
            defer wg.Done()
            //fmt.Println(url)
            //fmt.Println(prama)
            buf, err := EsReq("POST", url, []byte(prama))
            if err != nil {
                fmt.Println("http request videoUrlInfo info is err!")
                fmt.Println(len(capturetable))
                return
            }
            sources, err := Sourcelistforscroll(buf)
            if err != nil {
                fmt.Println(len(capturetable))
                return
            }
            for _, source := range sources["sourcelist"].([]map[string]interface{}) {
                capturetable = append(capturetable, source["id"].(string))
            }
            scroll_id := sources["scroll_id"].(string)
            //scroll请求头
            scroll_url := "http://" + serverIp + ":" + serverPort + "/_search/scroll"
            for {
                var tmpList []string
                next_scroll_id := ""
                if next_scroll_id != "" {
                    scroll_id = next_scroll_id
                }
                jsonDSL := `{
            "scroll": "1m",
            "scroll_id" : "` + scroll_id + `"
        }`
                //fmt.Println(scroll_url)
                //fmt.Println(jsonDSL)
                buf, err := EsReq("POST", scroll_url, []byte(jsonDSL))
                if err != nil {
                    fmt.Println("lenth1: ", len(capturetable))
                    return
                }
                nextSources, err := Sourcelistforscroll(buf)
                if nextSources == nil {
                    return
                }
                nextM := nextSources["sourcelist"].([]map[string]interface{})
                //fmt.Println("id",nextSources)
                if nextM == nil || len(nextM) == 0 {
                    //fmt.Println("lenth: ", len(capturetable))
                    return
                }
                //fmt.Println("id")
                for _, source := range nextM {
                    tmpList = append(tmpList, source["id"].(string))
                }
                //fmt.Println("tmpList: ", len(tmpList))
                lock.Lock()
                capturetable = append(capturetable, tmpList...)
                lock.Unlock()
                next_scroll_id = nextSources["scroll_id"].(string)
            }
    analyServerId := compareArgs.AnalyServerId
    if analyServerId == "" {
        fmt.Println("no analyServerId")
        return
    }
    analyServerFilterStr := "{\"term\":{\"analyServerId\":\"" + analyServerId + "\"}},"
            fmt.Println(len(capturetable))
    //请求头
    url := "http://" + serverIp + ":" + serverPort +
        "/" + indexName + "/_search?search_type=dfs_query_then_fetch"
        }()
    }
    wg.Wait()
    //请求体
    prama := "{" +
        "\"size\":\"100000000\"," +
        "\"query\":{\"bool\":{" + queryStr +
        "\"filter\":[" +
        cameraIdStr +
        taskIdStr +
        isCollectStr +
        esTableIdStr +
        analyServerFilterStr +
        "{\"range\":{\"picDate\":{\"from\":\"" + gteDate + "\",\"to\":\"" + lteDate + "\",\"include_lower\":true,\"include_upper\":true,\"boost\":1}}}]}}," +
        "\"_source\":[\"id\"]" +
        "}"
    fmt.Println(url)
    fmt.Println(prama)
    buf, err := EsReq("POST", url,[]byte(prama))
    if err != nil {
        fmt.Println("http request videoUrlInfo info is err!")
        return
    }
    sources, err := Sourcelist(buf)
    if err != nil {
        return
    }
    for _, source := range  sources{
        capturetable = append(capturetable, source["id"].(string))
    }
    return capturetable
    //fmt.Println("lenth_all: ", len(capturetable))
    //fmt.Println("耗时:", time.Since(ts))
    return capturetable
}
//获取底库人员ID
@@ -415,4 +486,4 @@
        tabsource[tableId] = append(tabsource[tableId], id)
    }
    return tabsource
}
}
EsClient.go
@@ -359,6 +359,45 @@
    }
    return sources,nil
}
//slice scroll 解析工具函数
func Sourcelistforscroll(buf []byte)(datasource map[string]interface{}, err error){
    var data = make(map[string]interface{})
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return nil, errors.New("http response interface can not change map[string]interface{}")
    }
    scroll_id, ok := out["_scroll_id"].(string)
    if !ok {
        return nil, errors.New("first hits change error!")
    }
    //fmt.Println("middle: ",scroll_id)
    middle, ok := out["hits"].(map[string]interface{})
    //fmt.Println("middle: ",out)
    if !ok {
        return nil, errors.New("first hits change error!")
    }
    var sources  = make([]map[string]interface{},0)
    for _, in := range middle["hits"].([]interface{}){
        tmpbuf, ok := in.(map[string]interface{})
        if !ok {
            fmt.Println("change to source error!")
            continue
        }
        source, ok := tmpbuf["_source"].(map[string]interface{})
        if !ok {
            fmt.Println("change _source error!")
            continue
        }
        sources  = append(sources, source )
    }
    data["sourcelist"] = sources
    data["scroll_id"] = scroll_id
    return data,nil
}
func EsReq(method string, url string, parama []byte) (buf []byte, err error) {
    defer elapsed("page")()