sunty
2024-05-31 f6ca7bb43270474fa876ff6ba62c6b2113b045ad
Optimize day and night algorithm, format date and time, batch process by date.
2个文件已修改
295 ■■■■ 已修改文件
EsApi.go 293 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
EsClient.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
EsApi.go
@@ -24,42 +24,6 @@
//***********************重庆Start**********************************//
type activeHourFormat struct {
    startTime string
    endTime   string
    startHour int
    endHour   int
}
//按需求(activeHourFormat结构体)格式化时间数据
func formatActiveHour(activeHour string) (activeHourFormat, error) {
    hours := strings.Split(activeHour, "-")
    if len(hours) == 2 {
        startHour := hours[0]
        endHour := hours[1]
        // 解析开始时间的小时和分钟
        startParts := strings.Split(startHour, ":")
        startHourInt, _ := strconv.Atoi(startParts[0])
        // 解析结束时间的小时和分钟
        endParts := strings.Split(endHour, ":")
        endHourInt, _ := strconv.Atoi(endParts[0])
        // 输出开始时间的小时
        //fmt.Println("开始时间的小时:", startHourInt)
        // 输出结束时间的小时 + 1
        endHourPlusOne := (endHourInt + 1) % 24 // 取余确保不超过24小时
        //fmt.Println("结束时间的小时 + 1:", endHourPlusOne)
        activeHourFormat := activeHourFormat{startTime: startHour, endTime: endHour, startHour: startHourInt, endHour: endHourPlusOne}
        return activeHourFormat, nil
    }
    return activeHourFormat{}, errors.New("错误:无法解析开始时间和结束时间")
}
//判断时间是否再范围之内
func isTimeInRange(timeStr, startStr, endStr string) bool {
    layout := "15:04:05"
@@ -178,21 +142,11 @@
//    formattedTime := parsedTime.Format("15:04:05")
//    return formattedTime, nil
//}
func resetDataId(dataId []string, id, dDate, dTime string, sDate *string, sTime *string) []string {
    dataId = make([]string, 0)
    *sDate = dDate
    *sTime = dTime
    dataId = append(dataId, id)
    return dataId
}
func decodeActivityId(aHFormat activeHourFormat, frequency int, intervalInMinutes int, source []map[string]interface{}) ([]map[string]interface{}, error) {
func decodeActivityId(frequency int, intervalInMinutes int, source []map[string]interface{}) ([]map[string]interface{}, error) {
    docInfo := make([]map[string]interface{}, 0)
    for _, info := range source {
        tmpInfo := make(map[string]interface{})
        activeId := make([]string, 0)
        sDate := ""
        sTime := ""
        documentNumber := info["key"].(string)
        tmpInfo["documentNumber"] = documentNumber
        //fmt.Println("documentNumber: ", documentNumber)
@@ -213,141 +167,142 @@
                picUrl = hitsResult[0].(map[string]interface{})["_source"].(map[string]interface{})["targetInfo"].([]interface{})[0].(map[string]interface{})["picSmUrl"].(string)
            }
        }
        //if hitsResult[0].(map[string]interface{})["baseInfo"] != nil {
        //    fmt.Println("picUrl1: ", picUrl)
        //    picUrl = hitsResult[0].(map[string]interface{})["baseInfo"].([]interface{})[0].(map[string]interface{})["targetPicUrl"].(string)
        //} else {
        //    if hitsResult[0].(map[string]interface{})["targetInfo"] != nil {
        //        fmt.Println("picUrl2: ", picUrl)
        //        picUrl = hitsResult[0].(map[string]interface{})["targetInfo"].([]interface{})[0].(map[string]interface{})["picSmUrl"].(string)
        //    }
        //}
        tmpInfo["picUrl"] = picUrl
        for sIndex, sourceInfo := range hitsResult {
        lastCaptureTime := ""
        for _, sourceInfo := range hitsResult {
            rSourceInfo := sourceInfo.(map[string]interface{})
            source := rSourceInfo["_source"].(map[string]interface{})
            captureTime := source["picDate"].(string)
            dDate := strings.Split(captureTime, " ")[0]
            dTime := strings.Split(captureTime[:19], " ")[1]
            id := source["id"].(string)
            //fmt.Println("sindex: ", sIndex, "documentNumber", tmpInfo["documentNumber"], "id: ", id, "captureTime: ", captureTime)
            if !isTimeInRange(dTime, aHFormat.startTime, aHFormat.endTime) {
                //if sDate != "" && len(dataId) >= frequency {
                //    activeId = append(activeId, dataId...)
                //    dataId = resetDataId(dataId, id, dDate, dTime, &sDate, &sTime)
                //}
            if lastCaptureTime != "" {
                if checkTimeDifference(lastCaptureTime[:19], captureTime[:19], intervalInMinutes) {
                    //fmt.Println("时间小于连续阈值", lastCaptureTime, captureTime)
                continue
            }
            if sDate == "" {
                sDate = dDate
                sTime = dTime
                dataId = append(dataId, id)
                if len(dataId) >= frequency {
                    activeId = append(activeId, dataId...)
                    dataId = resetDataId(dataId, id, dDate, dTime, &sDate, &sTime)
                }
                continue
            }
            if checkTimeDifference(sDate+" "+sTime, captureTime[:19], intervalInMinutes) {
                if len(dataId) >= frequency {
                    activeId = append(activeId, dataId...)
                    dataId = resetDataId(dataId, id, dDate, dTime, &sDate, &sTime)
                }
                continue
            }
            //fmt.Println(daysBetweenDates(sDate, dDate))
            if aHFormat.startHour < aHFormat.endHour && daysBetweenDates(sDate, dDate) == 0 {
                dataId = append(dataId, id)
            } else if aHFormat.startHour > aHFormat.endHour {
                if daysBetweenDates(sDate, dDate) == 0 {
                    if compareTimes(dTime, aHFormat.startTime) == compareTimes(sTime, aHFormat.startTime) {
                        //    ||compareTimes(dTime,aHFormat.endTime) == compareTimes(sTime, aHFormat.endTime){
            lastCaptureTime = captureTime
                        dataId = append(dataId, id)
                    }
                } else if daysBetweenDates(sDate, dDate) == 1 {
                    //初始时间戳在结束范围之前
                    if compareTimes(sTime, aHFormat.endTime) == -1 {
                        if len(dataId) >= frequency {
                            activeId = append(activeId, dataId...)
                            dataId = resetDataId(dataId, id, dDate, dTime, &sDate, &sTime)
                        }
                        //初始时间戳在开始范围之后
                    } else if compareTimes(sTime, aHFormat.endTime) == 1 {
                        //next时间戳在结束范围之前
                        if compareTimes(dTime, aHFormat.endTime) == -1 {
                            dataId = append(dataId, id)
                            //next时间戳在开始范围之后
                        } else if compareTimes(dTime, aHFormat.startTime) == 1 {
                            if len(dataId) >= frequency {
                                activeId = append(activeId, dataId...)
                                dataId = resetDataId(dataId, id, dDate, dTime, &sDate, &sTime)
                            }
                        }
                    }
                } else if daysBetweenDates(sDate, dDate) >= 1 {
                    //fmt.Println(len(dataId))
                    if len(dataId) >= frequency {
                        activeId = append(activeId, dataId...)
                        dataId = resetDataId(dataId, id, dDate, dTime, &sDate, &sTime)
                    }
                }
            }
            if sIndex == len(hitsResult)-1 {
                if len(dataId) >= frequency {
                    activeId = append(activeId, dataId...)
                }
            }
        }
        if len(activeId) > 0 {
            tmpInfo["id"] = activeId
        if len(dataId) > frequency {
            tmpInfo["id"] = dataId
            docInfo = append(docInfo, tmpInfo)
        }
    }
    return docInfo, nil
}
func DayNightActivityQuery(comIds []string, docNumber string, startTime string, endTime string, activeHour string, frequency int,
func DayNightActivityQuery(comIds []string, cameraIds []string, docNumber string, startDateStr string, endDateStr string, activeHour string, frequency int,
    intervalInMinutes int, indexName string, serverIp string, serverPort string) (map[string]interface{}, error) {
    esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
    aHFormat, err := formatActiveHour(activeHour)
    hours := strings.Split(activeHour, "-")
    startTimeStr := hours[0]
    endTimeStr := hours[1]
    layoutTime := "15:04:05"
    startTime, err := time.Parse(layoutTime, startTimeStr)
    if err != nil {
        return nil, err
    }
    endTime, err := time.Parse(layoutTime, endTimeStr)
    if err != nil {
        return nil, err
    }
    layoutDate := "2006-01-02"
    startDate, err := time.Parse(layoutDate, startDateStr)
    if err != nil {
        fmt.Println("Error parsing start time:", err)
        return nil, err
    }
    endDate, err := time.Parse(layoutDate, endDateStr)
    if err != nil {
        fmt.Println("Error parsing end time:", err)
        return nil, err
    }
    rangePicDate := make([]string, 0)
    if startTime.Before(endTime) {
        for !startDate.After(endDate) {
            sDateTime := startDate.Format(layoutDate) + " " + startTimeStr
            eDateTime := startDate.Format(layoutDate) + " " + endTimeStr
            rangePicDate = append(rangePicDate, `
                    {
                        "range": {
                            "picDate": {
                                "gte": "`+sDateTime+`",
                                "lt": "`+eDateTime+`"
                            }
                        }
                    }`)
            startDate = startDate.AddDate(0, 0, 1)
        }
    } else if startTime.After(endTime) {
        if startDate.Year() == endDate.Year() && startDate.Month() == endDate.Month() && startDate.Day() == endDate.Day() {
            fmt.Println("跨日期不支持单日查询")
            return nil, errors.New("跨日期不支持单日查询!")
        }
        for !startDate.After(endDate) {
            sDateTime := startDate.Format(layoutDate) + " " + startTimeStr
            startDate = startDate.AddDate(0, 0, 1)
            //fendDate := startDate.AddDate(0, 0, 1)
            if startDate.Year() == endDate.Year() && startDate.Month() == endDate.Month() && startDate.Day() == endDate.Day() {
                eDateTime := startDate.Format(layoutDate) + " " + endTimeStr
                rangePicDate = append(rangePicDate, `
                    {
                        "range": {
                            "picDate": {
                                "gte": "`+sDateTime+`",
                                "lt": "`+eDateTime+`"
                            }
                        }
                    }`)
                //fmt.Println("startDate: ", startDate, "/t endDate: ", endDate)
                break
            }
            eDateTime := startDate.Format(layoutDate) + " " + endTimeStr
            rangePicDate = append(rangePicDate, `
                    {
                        "range": {
                            "picDate": {
                                "gte": "`+sDateTime+`",
                                "lt": "`+eDateTime+`"
                            }
                        }
                    }`)
        }
    }
    //fmt.Println(rangePicDate)
    //return nil, nil
    filterDocIdAttr := ""
    if docNumber != "" {
        filterDocIdAttr = "{\"term\": {\"documentNumber\": \"" + docNumber + "\"}},"
    }
    comIdsStr := ""
    if comIds == nil || len(comIds) > 0 {
    if comIds != nil && len(comIds) > 0 {
        esComIds := strings.Replace(strings.Trim(fmt.Sprint(comIds), "[]"), " ", "\",\"", -1)
        comIdsStr = "{\"terms\":{\"communityId\":[\"" + esComIds + "\"]}},"
    }
    cameraIdsStr := ""
    if comIds != nil && len(cameraIds) > 0 {
        esCameraIds := strings.Replace(strings.Trim(fmt.Sprint(cameraIds), "[]"), " ", "\",\"", -1)
        cameraIdsStr = "{\"terms\":{\"cameraId\":[\"" + esCameraIds + "\"]}},"
    }
    var result = make(map[string]interface{})
    docInfos := make([]map[string]interface{}, 0)
    for _, rpd := range rangePicDate {
        //if comIdsStr != "" || filterDocIdAttr != "" || cameraIdsStr != "" {
        //    rpd += ","
        //}
    queryDSL := `
    {
        "size": 0,
        "query": {
            "bool": {
                "filter": [
                    {
                        "range": {
                            "picDate": {
                                "gte": "` + startTime + `",
                                "lt": "` + endTime + `"
                            }
                        }
                    },
                    ` + filterDocIdAttr + `
                    ` + comIdsStr + `
                    {
                        "script": {
                            "script": {
                                "source": "doc['picDate'].value.hourOfDay >= ` + strconv.Itoa(aHFormat.startHour) + ` || doc['picDate'].value.hourOfDay < ` + strconv.Itoa(aHFormat.endHour) + `",
                                "lang": "painless"
                            }
                        }
                    }
                        ` + cameraIdsStr + `
                        ` + filterDocIdAttr + `
                        ` + rpd + `
                ],
                "must_not": [
                    {
@@ -387,9 +342,8 @@
            }
        }
    }`
    //fmt.Println(esURL)
    //fmt.Println(queryDSL)
    var result = make(map[string]interface{})
        fmt.Println(esURL)
        fmt.Println(queryDSL)
    buf, err := EsReq("POST", esURL, []byte(queryDSL))
    if err != nil {
        return nil, err
@@ -399,17 +353,46 @@
        return nil, err
    }
    //fmt.Println(source)
    docResult, err := decodeActivityId(aHFormat, frequency, intervalInMinutes, source)
        //docResult 本次查询结果
        docResult, err := decodeActivityId(frequency, intervalInMinutes, source)
    if err != nil {
        return nil, err
    }
    //result, _ := decodeDocumentInfos(source)
    //return result, nil
    if len(docResult) == 0 {
        return result, nil
            continue
    }
    DataInfos, err := GetInfosByIds(docResult[0]["id"].([]string), indexName, serverIp, serverPort)
    result["documentNumbers"] = docResult
        if len(docInfos) == 0 {
            docInfos = append(docInfos, docResult...)
            continue
        }
        for _, dr := range docResult {
            var found bool
            for _, di := range docInfos {
                if dr["documentNumber"].(string) == di["documentNumber"].(string) {
                    // 找到了相同的documentNumber,将id合并到di["id"]中
                    ids, ok := di["id"].([]string)
                    if !ok {
                        // 如果id不是字符串切片类型,则进行初始化
                        ids = []string{}
                    }
                    ids = append(ids, dr["id"].([]string)...)
                    di["id"] = ids
                    //docInfos[diIndex] = di // 更新原始docInfos中的值
                    found = true
                    break
                }
            }
            if !found {
                // 没有找到相同的documentNumber,将dr添加到docInfos中
                docInfos = append(docInfos, dr)
            }
        }
    }
    if len(docInfos) == 0 {
        return nil, nil
    }
    DataInfos, err := GetInfosByIds(docInfos[0]["id"].([]string), indexName, serverIp, serverPort)
    result["documentNumbers"] = docInfos
    result["datalist"] = DataInfos
    return result, nil
}
EsClient.go
@@ -1056,7 +1056,7 @@
    if !ok {
        return nil, errors.New("first hits change error!")
    }
    //fmt.Println("本次共匹配条数为: ", out["hits"].(map[string]interface{})["total"].(float64))
    documentAggregations := middle["group_by_documentnumber"].(map[string]interface{})
    buckets := documentAggregations["buckets"].([]interface{})
    if len(buckets) == 0 {