sunty
2024-03-25 ebfa610f8c66fd2827a2eec619bfb3e0e22c332f
昼伏夜出调试
2个文件已修改
245 ■■■■ 已修改文件
EsApi.go 181 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
EsClient.go 64 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
EsApi.go
@@ -22,6 +22,141 @@
    }
}
//***********************重庆Start**********************************//
type activeHourFormat struct {
    startTime string
    endTime   string
    startHour int
    endHour   int
}
func formatActiveHour(activeHour string) (activeHourFormat, error) {
    hours := strings.Split(activeHour, "-")
    if len(hours) == 2 {
        startHour := hours[0]
        endHour := hours[1]
        // 解析开始时间的小时和分钟
        startParts := strings.Split(startHour, ":")
        startHourInt, _ := strconv.Atoi(startParts[0])
        // 解析结束时间的小时和分钟
        endParts := strings.Split(endHour, ":")
        endHourInt, _ := strconv.Atoi(endParts[0])
        // 输出开始时间的小时
        fmt.Println("开始时间的小时:", startHourInt)
        // 输出结束时间的小时 + 1
        endHourPlusOne := (endHourInt + 1) % 24 // 取余确保不超过24小时
        fmt.Println("结束时间的小时 + 1:", endHourPlusOne)
        activeHourFormat := activeHourFormat{startTime: startHour, endTime: endHour, startHour: startHourInt, endHour: endHourPlusOne}
        return activeHourFormat, nil
    }
    return activeHourFormat{}, errors.New("错误:无法解析开始时间和结束时间")
}
func DayNightActivityQuery(communityId string, startTime string, endTime string, activeHour string, indexName string, serverIp string, serverPort string) ([]string, error) {
    activityId := make([]string, 0)
    esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
    activeHourFormat, err := formatActiveHour(activeHour)
    if err != nil {
        return nil, err
    }
    queryDSL := `
    {
        "size": 0,
        "query": {
            "bool": {
                "filter": [
                    {
                        "range": {
                            "picDate": {
                                "gte": "` + startTime + `",
                                "lt": "` + endTime + `"
                            }
                        }
                    },
                    {
                        "term": {
                            "communityId": "` + communityId + `"
                        }
                    },
                    {
                        "script": {
                            "script": {
                                "source": "doc['picDate'].value.hourOfDay >= ` + strconv.Itoa(activeHourFormat.startHour) + ` || doc['picDate'].value.hourOfDay < ` + strconv.Itoa(activeHourFormat.endHour) + `",
                                "lang": "painless"
                            }
                        }
                    }
                ],
                "must_not": [
                    {
                        "term": {
                            "documentNumber": ""
                        }
                    }
                ]
            }
        },
        "aggs": {
            "group_by_documentnumber": {
                "terms": {
                    "field": "documentNumber",
                    "size": 100000
                },
                "aggs": {
                    "group_by_date": {
                        "date_histogram": {
                            "field": "picDate",
                            "interval": "1d", // 按天分桶
                            "format": "yyyy-MM-dd"
                        },
                        "aggs": {
                            "top_hits": {
                                "top_hits": {
                                    "_source": [
                                        "picDate"
                                    ],
                                    "size": 100000,
                                    "sort": [
                                        {
                                            "picDate": {
                                                "order": "desc"
                                            }
                                        }
                                    ]
                                }
                            }
                        }
                    }
                }
            }
        }
    }`
    //fmt.Println(esURL)
    //fmt.Println(queryDSL)
    buf, err := EsReq("POST", esURL, []byte(queryDSL))
    if err != nil {
        return nil, err
    }
    source, err := SourceAggregationList(buf)
    if err != nil {
        return nil, err
    }
    result, _ := decodeDocumentInfos(source)
    return result, nil
    return activityId, nil
}
// ***********************重庆End************************************//
// 根据抓拍人员id查询抓拍人员信息
func AIOceaninfosbyid(id []string, indexName string, serverIp string, serverPort string) ([]protomsg.AIOcean, error) {
    var aIOceanInfo []protomsg.AIOcean
@@ -92,7 +227,7 @@
    return videoUrl, nil
}
//根据抓拍库人员id查询特征值
// 根据抓拍库人员id查询特征值
func GetVideoPersonFaceFeatureById(id string, indexName string, serverIp string, serverPort string) (string, error) {
    var jsonDSL = `
            {
@@ -122,7 +257,7 @@
    return feature, nil
}
//根据目标id查询已追加条数
// 根据目标id查询已追加条数
func GetLinkTagInfoSize(id string, indexName string, serverIp string, serverPort string) (size int, err error) {
    url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
    queryDSL := `{
@@ -148,7 +283,7 @@
    return size, nil
}
//根据目标id追加跟踪信息
// 根据目标id追加跟踪信息
func AppendTargetInfo(id string, targetInfo string, indexName string, serverIp string, serverPort string, updateTime string) (string, error) {
    if targetInfo == "" {
        return "", errors.New("append data is nil")
@@ -490,7 +625,7 @@
    return ids, nil
}
//统计各个区域人数
// 统计各个区域人数
func StatisticsEveryAreaPersonsNumber(startTime string, endTime string, serverIp string, serverPort string, indexName string) ([]map[string]interface{}, error) {
    var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
    var requestBody = `{
@@ -617,7 +752,7 @@
}
//根据时间范围,摄像机列表,分组聚合人脸列表,返回分组数据
// 根据时间范围,摄像机列表,分组聚合人脸列表,返回分组数据
func GetFaceDataBucketsByCameraIdAndTimeReturnByGrouped(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) {
    var filterArr []string
    if cameraId != nil && len(cameraId) > 0 {
@@ -723,7 +858,7 @@
    return sources, nil
}
//根据时间范围,摄像机列表,分组聚合人脸列表
// 根据时间范围,摄像机列表,分组聚合人脸列表
func GetFaceDataBucketsByCameraIdAndTime(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) {
    var filterArr []string
    if cameraId != nil && len(cameraId) > 0 {
@@ -824,7 +959,7 @@
    return sources, nil
}
//根据抓拍人员id更新(picurl)图片地址
// 根据抓拍人员id更新(picurl)图片地址
func UpdatePicUrlById(id string, picUrl string, indexName string, serverIp string, serverPort string) (err error) {
    updateTime := time.Now().Format("2006-01-02 15:04:05")
    tRes, err := AIOceaninfosbyid([]string{id}, indexName, serverIp, serverPort)
@@ -887,7 +1022,7 @@
    return nil
}
//根据抓拍人员id更新(videourl)摄像机地址
// 根据抓拍人员id更新(videourl)摄像机地址
func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int, err error) {
    var info interface{}
@@ -947,7 +1082,7 @@
    return statu, nil
}
//获取当前节点抓拍库所有人员ID*缓存*
// 获取当前节点抓拍库所有人员ID*缓存*
func GetAllLocalVideopersonsId(compareArgs protomsg.CompareArgs, indexName string, serverIp string, serverPort string, alarmLevelTypes string) (capturetable []string) {
    queryStr := ""
    queryBody := compareArgs.InputValue
@@ -1116,7 +1251,7 @@
    return capturetable
}
//初始化实时抓拍
// 初始化实时抓拍
func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm string, category string, quantity int) ([]protomsg.AIOcean, error) {
    var aIOceanInfo []protomsg.AIOcean
    url := "http://" + serverIp + ":" + serverPort +
@@ -1169,7 +1304,7 @@
    return aIOcean, nil
}
//实时抓拍
// 实时抓拍
func RealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool) ([]protomsg.AIOcean, error) {
    var aIOceanInfo []protomsg.AIOcean
    url := "http://" + serverIp + ":" + serverPort +
@@ -1213,7 +1348,7 @@
    return aIOcean, nil
}
//综合统计
// 综合统计
func StatisticsComprehensive(serverIp string, serverPort string, indexName string, isAlarm string) (total int, err error) {
    url := "http://" + serverIp + ":" + serverPort +
        "/" + indexName + "/_search"
@@ -1262,7 +1397,7 @@
    return total, nil
}
//实时报警任务比率
// 实时报警任务比率
func RealTimeAlarmTaskRate(serverIp string, serverPort string, indexName string) (sources []map[string]interface{}, err error) {
    url := "http://" + serverIp + ":" + serverPort +
        "/" + indexName + "/_search"
@@ -1323,7 +1458,7 @@
    return sources, nil
}
//聚合任务列表,taskId+taskName
// 聚合任务列表,taskId+taskName
func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string, cameraIds []string) (sources []map[string]interface{}, err error) {
    url := "http://" + serverIp + ":" + serverPort +
        "/" + indexName + "/_search"
@@ -1419,7 +1554,7 @@
}
//添加即将删除信号
// 添加即将删除信号
func AddDeleteSignal() {
}
@@ -1460,7 +1595,7 @@
}
//查询时间段数据 *缓存*
// 查询时间段数据 *缓存*
func GetPeriodInfos(serverIp string, serverPort string, startTime string, endTime string, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) {
    var capdbinfo []*protomsg.MultiFeaCache
    url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local"
@@ -1638,8 +1773,8 @@
    return dbinfos, nil
}
//************************CORN TASK*******************************
//查询日期范围内是否还存在数据
// ************************CORN TASK*******************************
// 查询日期范围内是否还存在数据
func QueryAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
    url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search"
    deleteJson := `{
@@ -1678,8 +1813,8 @@
    return result, nil
}
//按日期范围,服务器Id删除数据
func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (total int, err error, ) {
// 按日期范围,服务器Id删除数据
func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (total int, err error) {
    url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_delete_by_query"
    deleteJson := `{
    "query":{
@@ -1714,7 +1849,7 @@
    return deleteRes, nil
}
//给所有节点追加删除任务信息
// 给所有节点追加删除任务信息
func AddDelTask(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) {
    url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
    addJson := `{
@@ -1757,7 +1892,7 @@
    return result, nil
}
//移除已执行完的删除任务
// 移除已执行完的删除任务
func DeleteDelTask(serverIp string, serverPort string, indexName string, analyServerId string) (result bool, err error) {
    url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query"
    deleteJson := `{
@@ -1802,7 +1937,7 @@
    ShardNode  string `json:"shardNode"`  //分片所在节点名称
}
//获取索引分片信息
// 获取索引分片信息
func GetShardsByIndex(serverIp string, serverPort string, indexName string) ([]ShardInfo, error) {
    url := "http://" + serverIp + ":" + serverPort + "/_cat/shards?v"
    buf, err := EsReq("GET", url, []byte(""))
EsClient.go
@@ -15,7 +15,6 @@
    "time"
)
func Parsesources(sources []map[string]interface{}) (multiInfos []*protomsg.MultiFeaCache) {
    var ok bool
    for _, source := range sources {
@@ -104,7 +103,7 @@
    return
}
//解析抓拍库人员结构
// 解析抓拍库人员结构
func AIOceanAnalysis(sources []map[string]interface{}) (tmpinfos []protomsg.AIOcean) {
    var ok bool
    for _, source := range sources {
@@ -311,7 +310,7 @@
    return tmpinfos
}
//解析底库人员结构
// 解析底库人员结构
func Dbpersonbyid(sources []map[string]interface{}) (tmpinfos []protomsg.Dbperson) {
    var ok bool
@@ -379,7 +378,7 @@
    return tmpinfos
}
//解析底库结构
// 解析底库结构
func Dbtablebyid(sources []map[string]interface{}) (tmpinfos []protomsg.Dbtable) {
    var ok bool
@@ -548,7 +547,7 @@
    return allSource, nil
}
func SourceDeduplication(buf [] byte) ([]map[string]interface{}, error) {
func SourceDeduplication(buf []byte) ([]map[string]interface{}, error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
@@ -577,8 +576,8 @@
    return faceId, nil
}
//解析聚合计数结构
func SourceStatistics(buf [] byte) ([]map[string]interface{}, error) {
// 解析聚合计数结构
func SourceStatistics(buf []byte) ([]map[string]interface{}, error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
@@ -604,7 +603,7 @@
    return resultData, nil
}
func SourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) {
func SourceAggregations(buf []byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) {
    s := make(map[string]interface{})
    loc, err := time.LoadLocation("Asia/Shanghai")
    if err != nil {
@@ -739,7 +738,7 @@
    return s, nil
}
func SourceAggregationsReturnByGrouped(buf [] byte, thresholdTime float64) (sources []map[string]interface{}, err error) {
func SourceAggregationsReturnByGrouped(buf []byte, thresholdTime float64) (sources []map[string]interface{}, err error) {
    loc, err := time.LoadLocation("Asia/Shanghai")
    if err != nil {
        return nil, errors.New("时区设置错误")
@@ -868,7 +867,7 @@
    return sources, nil
}
//解析抓拍库人员结构
// 解析抓拍库人员结构
func PerSonAnalysis(preData []map[string]interface{}) (sources []map[string]interface{}, err error) {
    loc, err := time.LoadLocation("Asia/Shanghai")
    if err != nil {
@@ -1034,15 +1033,43 @@
        return -1, errors.New("first total change error!")
    }
    tmp,b := middle["total"].(map[string]interface{})
    tmp, b := middle["total"].(map[string]interface{})
    if b != true {
        v := middle["total"].(float64)
        t := int(v)
        return t,nil
        return t, nil
    }
    value := tmp["value"].(float64)
    total = int(value)
    return total, nil
}
func SourceAggregationList(buf []byte) (sources []map[string]interface{}, err error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return nil, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["aggregations"].(map[string]interface{})
    if !ok {
        return nil, errors.New("first hits change error!")
    }
    documentAggregations := middle["group_by_documentnumber"].(map[string]interface{})
    buckets := documentAggregations["buckets"].([]interface{})
    if len(buckets) == 0 {
        return nil, nil
    }
    for _, in := range buckets {
        tmpbuf, ok := in.(map[string]interface{})
        if !ok {
            return nil, errors.New("")
        }
        sources = append(sources, tmpbuf)
    }
    return sources, nil
}
func EsReq(method string, url string, parama []byte) (buf []byte, err error) {
@@ -1053,7 +1080,7 @@
    }
    request, err := http.NewRequest(method, url, bytes.NewBuffer(parama))
    request.Header.Set("Content-type", "application/json")
    request.Header.Set("Authorization",Token)
    request.Header.Set("Authorization", Token)
    if err != nil {
        fmt.Println("build request fail !")
@@ -1086,15 +1113,14 @@
// 赋值时检测是否能够赋值
//func //Isnil(key string, ok bool){
//    if !ok {
//            fmt.Println(key, "is nil can not asign")
//            fmt.Println(key, "is nil can not asign")
//    }
//}
type account struct {
    Username string `mapstructure: "username"`
    Username     string `mapstructure: "username"`
    Userpassword string `mapstructure: "userpassword"`
}
var Account = &account{}
@@ -1109,9 +1135,9 @@
    v.AddConfigPath("/opt/vasystem/config/")
    err := v.ReadInConfig()
    if err != nil {
        log.Fatal("err on parsing configuration file!",err)
        log.Fatal("err on parsing configuration file!", err)
    }
    v.UnmarshalKey("es.account",Account)
    v.UnmarshalKey("es.account", Account)
    Token = "Basic "+base64.StdEncoding.EncodeToString([]byte(Account.Username+":"+Account.Userpassword))
    Token = "Basic " + base64.StdEncoding.EncodeToString([]byte(Account.Username+":"+Account.Userpassword))
}