sunty
2024-03-27 1278c1f6b6e79238b520513c80ce26882e805f71
add DayNightActivityQuery
1个文件已修改
325 ■■■■■ 已修改文件
EsApi.go 325 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
EsApi.go
@@ -31,6 +31,7 @@
    endHour   int
}
//按需求(activeHourFormat结构体)格式化时间数据
func formatActiveHour(activeHour string) (activeHourFormat, error) {
    hours := strings.Split(activeHour, "-")
@@ -59,15 +60,240 @@
}
func DayNightActivityQuery(communityId string, startTime string, endTime string, activeHour string, indexName string, serverIp string, serverPort string) ([]string, error) {
    activityId := make([]string, 0)
//判断时间是否再范围之内
func isTimeInRange(timeStr, startStr, endStr string) bool {
    layout := "15:04:05"
    timeStamp, err := time.Parse(layout, timeStr)
    if err != nil {
        fmt.Println("Error parsing timestamp:", err)
        return false
    }
    startTime, err := time.Parse(layout, startStr)
    if err != nil {
        fmt.Println("Error parsing start time:", err)
        return false
    }
    endTime, err := time.Parse(layout, endStr)
    if err != nil {
        fmt.Println("Error parsing end time:", err)
        return false
    }
    if startTime.After(endTime) {
        // 跨越日期的情况
        return timeStamp.After(startTime) || timeStamp.Before(endTime)
    } else {
        // 不跨越日期的情况
        return timeStamp.After(startTime) && timeStamp.Before(endTime)
    }
}
//判断两个时间先后
func compareTimes(time1Str, time2Str string) int {
    layout := "15:04:05"
    time1, err := time.Parse(layout, time1Str)
    if err != nil {
        fmt.Println("Error parsing time 1:", err)
        return 0
    }
    time2, err := time.Parse(layout, time2Str)
    if err != nil {
        fmt.Println("Error parsing time 2:", err)
        return 0
    }
    if time1.Before(time2) {
        return -1 // time1 在 time2 之前
    } else if time1.After(time2) {
        return 1 // time1 在 time2 之后
    } else {
        return 0 // time1 和 time2 相等
    }
}
//判断日期相差几天
func daysBetweenDates(date1Str, date2Str string) int {
    layout := "2006-01-02"
    date1, err := time.Parse(layout, date1Str)
    if err != nil {
        fmt.Println("Error parsing date 1:", err)
        return 0
    }
    date2, err := time.Parse(layout, date2Str)
    if err != nil {
        fmt.Println("Error parsing date 2:", err)
        return 0
    }
    duration := date2.Sub(date1)
    days := int(duration.Hours() / 24)
    return days
}
//计算时间阈值
func checkTimeDifference(timestampStr1 string, timestampStr2 string, intervalInMinutes int) bool {
    layout := "2006-01-02 15:04:05"
    timestampStr1 = timestampStr1[:19]
    timestampStr2 = timestampStr2[:19]
    // 将字符串解析为时间
    time1, err := time.Parse(layout, timestampStr1)
    if err != nil {
        fmt.Println("时间解析失败:", err)
        return false
    }
    time2, err := time.Parse(layout, timestampStr2)
    if err != nil {
        fmt.Println("时间解析失败:", err)
        return false
    }
    // 计算时间差
    diff := time2.Sub(time1)
    // 检查时间差是否小于等于指定的间隔
    if diff.Minutes() <= float64(intervalInMinutes) {
        return true
    } else {
        return false
    }
}
////格式化时间hh:mm:ss:zzz to hh:mm:ss
//func formatTime(inputTime string) (string, error) {
//    parsedTime, err := time.Parse("15:04:05:000", inputTime)
//    if err != nil {
//        return "", err
//    }
//
//    formattedTime := parsedTime.Format("15:04:05")
//    return formattedTime, nil
//}
func resetDataId(dataId []string, id, dDate, dTime string, sDate *string, sTime *string) []string {
    dataId = make([]string, 0)
    *sDate = dDate
    *sTime = dTime
    dataId = append(dataId, id)
    return dataId
}
func decodeActivityId(aHFormat activeHourFormat, frequency int, intervalInMinutes int, source []map[string]interface{}) ([]map[string]interface{}, error) {
    docInfo := make([]map[string]interface{}, 0)
    for _, info := range source {
        tmpInfo := make(map[string]interface{})
        activeId := make([]string, 0)
        sDate := ""
        sTime := ""
        documentNumber := info["key"].(string)
        tmpInfo["documentNumber"] = documentNumber
        //fmt.Println("documentNumber: ", documentNumber)
        topHits := info["top_hits"].(map[string]interface{})
        hits := topHits["hits"].(map[string]interface{})
        hitsResult := hits["hits"].([]interface{})
        dataId := make([]string, 0)
        for sIndex, sourceInfo := range hitsResult {
            rSourceInfo := sourceInfo.(map[string]interface{})
            source := rSourceInfo["_source"].(map[string]interface{})
            captureTime := source["picDate"].(string)
            dDate := strings.Split(captureTime, " ")[0]
            dTime := strings.Split(captureTime[:19], " ")[1]
            //fmt.Println(captureTime, dDate, dTime)
            id := source["id"].(string)
            //fmt.Println("sindex: ", sIndex, "documentNumber", tmpInfo["documentNumber"], "id: ", id, "captureTime: ", captureTime)
            if !isTimeInRange(dTime, aHFormat.startTime, aHFormat.endTime) {
                if sDate != "" && len(dataId) >= frequency {
                    activeId = append(activeId, dataId...)
                    dataId = resetDataId(dataId, id, dDate, dTime, &sDate, &sTime)
                }
                continue
            }
            if sDate == "" {
                sDate = dDate
                sTime = dTime
                dataId = append(dataId, id)
                if len(dataId) >= frequency {
                    activeId = append(activeId, dataId...)
                    dataId = resetDataId(dataId, id, dDate, dTime, &sDate, &sTime)
                }
                continue
            }
            if checkTimeDifference(sDate+" "+sTime, captureTime[:19], intervalInMinutes) {
                if len(dataId) >= frequency {
                    activeId = append(activeId, dataId...)
                    dataId = resetDataId(dataId, id, dDate, dTime, &sDate, &sTime)
                }
                continue
            }
            //fmt.Println(daysBetweenDates(sDate, dDate))
            if aHFormat.startHour < aHFormat.endHour && daysBetweenDates(sDate, dDate) == 0 {
                dataId = append(dataId, id)
            } else if aHFormat.startHour > aHFormat.endHour {
                if daysBetweenDates(sDate, dDate) == 0 {
                    if compareTimes(dTime, aHFormat.startTime) == compareTimes(sTime, aHFormat.startTime) {
                        //    ||compareTimes(dTime,aHFormat.endTime) == compareTimes(sTime, aHFormat.endTime){
                        dataId = append(dataId, id)
                    }
                } else if daysBetweenDates(sDate, dDate) == 1 {
                    //初始时间戳在结束范围之前
                    if compareTimes(sTime, aHFormat.endTime) == -1 {
                        if len(dataId) >= frequency {
                            activeId = append(activeId, dataId...)
                            dataId = resetDataId(dataId, id, dDate, dTime, &sDate, &sTime)
                        }
                        //初始时间戳在开始范围之后
                    } else if compareTimes(sTime, aHFormat.endTime) == 1 {
                        //next时间戳在结束范围之前
                        if compareTimes(dTime, aHFormat.endTime) == -1 {
                            dataId = append(dataId, id)
                            //next时间戳在开始范围之后
                        } else if compareTimes(dTime, aHFormat.startTime) == 1 {
                            if len(dataId) >= frequency {
                                activeId = append(activeId, dataId...)
                                dataId = resetDataId(dataId, id, dDate, dTime, &sDate, &sTime)
                            }
                        }
                    }
                } else if daysBetweenDates(sDate, dDate) >= 1 {
                    //fmt.Println(len(dataId))
                    if len(dataId) >= frequency {
                        activeId = append(activeId, dataId...)
                        dataId = resetDataId(dataId, id, dDate, dTime, &sDate, &sTime)
                    }
                }
            }
            if sIndex == len(hitsResult)-1 {
                if len(dataId) >= frequency {
                    activeId = append(activeId, dataId...)
                }
            }
        }
        if len(activeId) > 0 {
            tmpInfo["id"] = activeId
            docInfo = append(docInfo, tmpInfo)
        }
    }
    return docInfo, nil
}
func DayNightActivityQuery(communityId string, documentNumber string,startTime string, endTime string, activeHour string, frequency int,
    intervalInMinutes int, indexName string, serverIp string, serverPort string) (map[string]interface{}, error) {
    esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
    activeHourFormat, err := formatActiveHour(activeHour)
    aHFormat, err := formatActiveHour(activeHour)
    if err != nil {
        return nil, err
    }
    filterDocIdAttr := ""
    if documentNumber != ""{
        filterDocIdAttr = "{\"term\": {\""+documentNumber+"\": \"\"}},"
    }
    queryDSL := `
    {
        "size": 0,
@@ -82,6 +308,7 @@
                            }
                        }
                    },
                    `+filterDocIdAttr+`
                    {
                        "term": {
                            "communityId": "` + communityId + `"
@@ -90,7 +317,7 @@
                    {
                        "script": {
                            "script": {
                                "source": "doc['picDate'].value.hourOfDay >= ` + strconv.Itoa(activeHourFormat.startHour) + ` || doc['picDate'].value.hourOfDay < ` + strconv.Itoa(activeHourFormat.endHour) + `",
                                "source": "doc['picDate'].value.hourOfDay >= ` + strconv.Itoa(aHFormat.startHour) + ` || doc['picDate'].value.hourOfDay < ` + strconv.Itoa(aHFormat.endHour) + `",
                                "lang": "painless"
                            }
                        }
@@ -111,37 +338,30 @@
                    "field": "documentNumber",
                    "size": 100000
                },
                "aggs": {
                    "group_by_date": {
                        "date_histogram": {
                            "field": "picDate",
                            "interval": "1d", // 按天分桶
                            "format": "yyyy-MM-dd"
                        },
                        "aggs": {
                            "top_hits": {
                                "top_hits": {
                                    "_source": [
                                        "picDate"
                                    ],
                                    "size": 100000,
                                    "sort": [
                                        {
                                            "picDate": {
                                                "order": "desc"
                                            }
                                        }
                                    ]
                "aggs": {
                    "top_hits": {
                        "top_hits": {
                            "_source": [
                                "id",
                                "picDate"
                            ],
                            "size": 100000,
                            "sort": [
                                {
                                    "picDate": {
                                        "order": "asc"
                                    }
                                }
                            }
                        }
                    }
                            ]
                        }
                    }
                }
            }
        }
    }`
    //fmt.Println(esURL)
    //fmt.Println(queryDSL)
    var result = make(map[string]interface{})
    buf, err := EsReq("POST", esURL, []byte(queryDSL))
    if err != nil {
        return nil, err
@@ -150,10 +370,53 @@
    if err != nil {
        return nil, err
    }
    result, _ := decodeDocumentInfos(source)
    //fmt.Println(source)
    docResult, err := decodeActivityId(aHFormat, frequency, intervalInMinutes, source)
    if err != nil {
        return nil, err
    }
    //result, _ := decodeDocumentInfos(source)
    //return result, nil
    if len(docResult) ==0 {
        return result,nil
    }
    DataInfos, err := GetInfosByIds(docResult[0]["id"].([]string), indexName, serverIp, serverPort)
    result["documentNumbers"] = docResult
    result["datalist"] = DataInfos
    return result, nil
}
    return activityId, nil
func GetInfosByIds(ids []string, indexName string, serverIp string, serverPort string) ([]map[string]interface{}, error) {
    captureIds := strings.Replace(strings.Trim(fmt.Sprint(ids), "[]"), " ", "\",\"", -1)
    esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
    queryDSL := `
            {
                "query": {
                    "bool": {
                        "filter": [{
                            "terms": {
                                "id": [
                                    "` + captureIds + `"
                                ]
                            }
                        }]
                    }
                },
                "size":1000000,
                "sort":[{"picDate":{"order":"desc"}}],
                "_source": {"includes":[],"excludes":["*.feature"]}
            }
`
    buf, err := EsReq("POST", esURL, []byte(queryDSL))
    if err != nil {
        return nil, err
    }
    sources, err := Sourcelist(buf)
    if err != nil {
        return nil, err
    }
    return sources, nil
}
// ***********************重庆End************************************//