| | |
| | | } |
| | | } |
| | | |
| | | //***********************重庆Start**********************************// |
| | | |
| | | type activeHourFormat struct { |
| | | startTime string |
| | | endTime string |
| | | startHour int |
| | | endHour int |
| | | } |
| | | |
| | | func formatActiveHour(activeHour string) (activeHourFormat, error) { |
| | | hours := strings.Split(activeHour, "-") |
| | | |
| | | if len(hours) == 2 { |
| | | startHour := hours[0] |
| | | endHour := hours[1] |
| | | |
| | | // 解析开始时间的小时和分钟 |
| | | startParts := strings.Split(startHour, ":") |
| | | startHourInt, _ := strconv.Atoi(startParts[0]) |
| | | |
| | | // 解析结束时间的小时和分钟 |
| | | endParts := strings.Split(endHour, ":") |
| | | endHourInt, _ := strconv.Atoi(endParts[0]) |
| | | |
| | | // 输出开始时间的小时 |
| | | fmt.Println("开始时间的小时:", startHourInt) |
| | | |
| | | // 输出结束时间的小时 + 1 |
| | | endHourPlusOne := (endHourInt + 1) % 24 // 取余确保不超过24小时 |
| | | fmt.Println("结束时间的小时 + 1:", endHourPlusOne) |
| | | activeHourFormat := activeHourFormat{startTime: startHour, endTime: endHour, startHour: startHourInt, endHour: endHourPlusOne} |
| | | return activeHourFormat, nil |
| | | } |
| | | return activeHourFormat{}, errors.New("错误:无法解析开始时间和结束时间") |
| | | |
| | | } |
| | | |
| | | func DayNightActivityQuery(communityId string, startTime string, endTime string, activeHour string, indexName string, serverIp string, serverPort string) ([]string, error) { |
| | | activityId := make([]string, 0) |
| | | esURL := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | |
| | | activeHourFormat, err := formatActiveHour(activeHour) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | |
| | | queryDSL := ` |
| | | { |
| | | "size": 0, |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "range": { |
| | | "picDate": { |
| | | "gte": "` + startTime + `", |
| | | "lt": "` + endTime + `" |
| | | } |
| | | } |
| | | }, |
| | | { |
| | | "term": { |
| | | "communityId": "` + communityId + `" |
| | | } |
| | | }, |
| | | { |
| | | "script": { |
| | | "script": { |
| | | "source": "doc['picDate'].value.hourOfDay >= ` + strconv.Itoa(activeHourFormat.startHour) + ` || doc['picDate'].value.hourOfDay < ` + strconv.Itoa(activeHourFormat.endHour) + `", |
| | | "lang": "painless" |
| | | } |
| | | } |
| | | } |
| | | ], |
| | | "must_not": [ |
| | | { |
| | | "term": { |
| | | "documentNumber": "" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "aggs": { |
| | | "group_by_documentnumber": { |
| | | "terms": { |
| | | "field": "documentNumber", |
| | | "size": 100000 |
| | | }, |
| | | "aggs": { |
| | | "group_by_date": { |
| | | "date_histogram": { |
| | | "field": "picDate", |
| | | "interval": "1d", // 按天分桶 |
| | | "format": "yyyy-MM-dd" |
| | | }, |
| | | "aggs": { |
| | | "top_hits": { |
| | | "top_hits": { |
| | | "_source": [ |
| | | "picDate" |
| | | ], |
| | | "size": 100000, |
| | | "sort": [ |
| | | { |
| | | "picDate": { |
| | | "order": "desc" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | }` |
| | | //fmt.Println(esURL) |
| | | //fmt.Println(queryDSL) |
| | | buf, err := EsReq("POST", esURL, []byte(queryDSL)) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | source, err := SourceAggregationList(buf) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | result, _ := decodeDocumentInfos(source) |
| | | return result, nil |
| | | |
| | | return activityId, nil |
| | | } |
| | | |
| | | // ***********************重庆End************************************// |
| | | // 根据抓拍人员id查询抓拍人员信息 |
| | | func AIOceaninfosbyid(id []string, indexName string, serverIp string, serverPort string) ([]protomsg.AIOcean, error) { |
| | | var aIOceanInfo []protomsg.AIOcean |
| | |
| | | return videoUrl, nil |
| | | } |
| | | |
| | | //根据抓拍库人员id查询特征值 |
| | | // 根据抓拍库人员id查询特征值 |
| | | func GetVideoPersonFaceFeatureById(id string, indexName string, serverIp string, serverPort string) (string, error) { |
| | | var jsonDSL = ` |
| | | { |
| | |
| | | return feature, nil |
| | | } |
| | | |
| | | //根据目标id查询已追加条数 |
| | | // 根据目标id查询已追加条数 |
| | | func GetLinkTagInfoSize(id string, indexName string, serverIp string, serverPort string) (size int, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | queryDSL := `{ |
| | |
| | | return size, nil |
| | | } |
| | | |
| | | //根据目标id追加跟踪信息 |
| | | // 根据目标id追加跟踪信息 |
| | | func AppendTargetInfo(id string, targetInfo string, indexName string, serverIp string, serverPort string, updateTime string) (string, error) { |
| | | if targetInfo == "" { |
| | | return "", errors.New("append data is nil") |
| | |
| | | return ids, nil |
| | | } |
| | | |
| | | //统计各个区域人数 |
| | | // 统计各个区域人数 |
| | | func StatisticsEveryAreaPersonsNumber(startTime string, endTime string, serverIp string, serverPort string, indexName string) ([]map[string]interface{}, error) { |
| | | var requestUrl = "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | var requestBody = `{ |
| | |
| | | |
| | | } |
| | | |
| | | //根据时间范围,摄像机列表,分组聚合人脸列表,返回分组数据 |
| | | // 根据时间范围,摄像机列表,分组聚合人脸列表,返回分组数据 |
| | | func GetFaceDataBucketsByCameraIdAndTimeReturnByGrouped(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) { |
| | | var filterArr []string |
| | | if cameraId != nil && len(cameraId) > 0 { |
| | |
| | | return sources, nil |
| | | } |
| | | |
| | | //根据时间范围,摄像机列表,分组聚合人脸列表 |
| | | // 根据时间范围,摄像机列表,分组聚合人脸列表 |
| | | func GetFaceDataBucketsByCameraIdAndTime(cameraId []string, personId []string, startTime string, endTime string, thresholdTime float64, serverIp string, ServerPort string, indexName string) (buckersDate map[string]interface{}, err error) { |
| | | var filterArr []string |
| | | if cameraId != nil && len(cameraId) > 0 { |
| | |
| | | return sources, nil |
| | | } |
| | | |
| | | //根据抓拍人员id更新(picurl)图片地址 |
| | | // 根据抓拍人员id更新(picurl)图片地址 |
| | | func UpdatePicUrlById(id string, picUrl string, indexName string, serverIp string, serverPort string) (err error) { |
| | | updateTime := time.Now().Format("2006-01-02 15:04:05") |
| | | tRes, err := AIOceaninfosbyid([]string{id}, indexName, serverIp, serverPort) |
| | |
| | | return nil |
| | | } |
| | | |
| | | //根据抓拍人员id更新(videourl)摄像机地址 |
| | | // 根据抓拍人员id更新(videourl)摄像机地址 |
| | | func UpdateVideourlById(id string, videoUrl string, indexName string, serverIp string, serverPort string, command int) (statu int, err error) { |
| | | |
| | | var info interface{} |
| | |
| | | return statu, nil |
| | | } |
| | | |
| | | //获取当前节点抓拍库所有人员ID*缓存* |
| | | // 获取当前节点抓拍库所有人员ID*缓存* |
| | | func GetAllLocalVideopersonsId(compareArgs protomsg.CompareArgs, indexName string, serverIp string, serverPort string, alarmLevelTypes string) (capturetable []string) { |
| | | queryStr := "" |
| | | queryBody := compareArgs.InputValue |
| | |
| | | return capturetable |
| | | } |
| | | |
| | | //初始化实时抓拍 |
| | | // 初始化实时抓拍 |
| | | func InitRealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm string, category string, quantity int) ([]protomsg.AIOcean, error) { |
| | | var aIOceanInfo []protomsg.AIOcean |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | |
| | | return aIOcean, nil |
| | | } |
| | | |
| | | //实时抓拍 |
| | | // 实时抓拍 |
| | | func RealTimeCapture(serverIp string, serverPort string, indexName string, isAlarm bool) ([]protomsg.AIOcean, error) { |
| | | var aIOceanInfo []protomsg.AIOcean |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | |
| | | return aIOcean, nil |
| | | } |
| | | |
| | | //综合统计 |
| | | // 综合统计 |
| | | func StatisticsComprehensive(serverIp string, serverPort string, indexName string, isAlarm string) (total int, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | | "/" + indexName + "/_search" |
| | |
| | | return total, nil |
| | | } |
| | | |
| | | //实时报警任务比率 |
| | | // 实时报警任务比率 |
| | | func RealTimeAlarmTaskRate(serverIp string, serverPort string, indexName string) (sources []map[string]interface{}, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | | "/" + indexName + "/_search" |
| | |
| | | return sources, nil |
| | | } |
| | | |
| | | //聚合任务列表,taskId+taskName |
| | | // 聚合任务列表,taskId+taskName |
| | | func AggregateTaskList(serverIp string, serverPort string, indexName string, analyServerId string, cameraIds []string) (sources []map[string]interface{}, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + |
| | | "/" + indexName + "/_search" |
| | |
| | | |
| | | } |
| | | |
| | | //添加即将删除信号 |
| | | // 添加即将删除信号 |
| | | func AddDeleteSignal() { |
| | | |
| | | } |
| | |
| | | |
| | | } |
| | | |
| | | //查询时间段数据 *缓存* |
| | | // 查询时间段数据 *缓存* |
| | | func GetPeriodInfos(serverIp string, serverPort string, startTime string, endTime string, indexName string, shards string, targetType string) ([]*protomsg.MultiFeaCache, error) { |
| | | var capdbinfo []*protomsg.MultiFeaCache |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search?preference=_shards:" + shards + "|_only_local" |
| | |
| | | return dbinfos, nil |
| | | } |
| | | |
| | | //************************CORN TASK******************************* |
| | | //查询日期范围内是否还存在数据 |
| | | // ************************CORN TASK******************************* |
| | | // 查询日期范围内是否还存在数据 |
| | | func QueryAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_search" |
| | | deleteJson := `{ |
| | |
| | | return result, nil |
| | | } |
| | | |
| | | //按日期范围,服务器Id删除数据 |
| | | func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (total int, err error, ) { |
| | | // 按日期范围,服务器Id删除数据 |
| | | func DeleteAnalyServerData(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (total int, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_delete_by_query" |
| | | deleteJson := `{ |
| | | "query":{ |
| | |
| | | return deleteRes, nil |
| | | } |
| | | |
| | | //给所有节点追加删除任务信息 |
| | | // 给所有节点追加删除任务信息 |
| | | func AddDelTask(serverIp string, serverPort string, indexName string, startTime string, endTime string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query" |
| | | addJson := `{ |
| | |
| | | return result, nil |
| | | } |
| | | |
| | | //移除已执行完的删除任务 |
| | | // 移除已执行完的删除任务 |
| | | func DeleteDelTask(serverIp string, serverPort string, indexName string, analyServerId string) (result bool, err error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/" + indexName + "/_update_by_query" |
| | | deleteJson := `{ |
| | |
| | | ShardNode string `json:"shardNode"` //分片所在节点名称 |
| | | } |
| | | |
| | | //获取索引分片信息 |
| | | // 获取索引分片信息 |
| | | func GetShardsByIndex(serverIp string, serverPort string, indexName string) ([]ShardInfo, error) { |
| | | url := "http://" + serverIp + ":" + serverPort + "/_cat/shards?v" |
| | | buf, err := EsReq("GET", url, []byte("")) |
| | |
| | | "time" |
| | | ) |
| | | |
| | | |
| | | func Parsesources(sources []map[string]interface{}) (multiInfos []*protomsg.MultiFeaCache) { |
| | | var ok bool |
| | | for _, source := range sources { |
| | |
| | | return |
| | | } |
| | | |
| | | //解析抓拍库人员结构 |
| | | // 解析抓拍库人员结构 |
| | | func AIOceanAnalysis(sources []map[string]interface{}) (tmpinfos []protomsg.AIOcean) { |
| | | var ok bool |
| | | for _, source := range sources { |
| | |
| | | return tmpinfos |
| | | } |
| | | |
| | | //解析底库人员结构 |
| | | // 解析底库人员结构 |
| | | func Dbpersonbyid(sources []map[string]interface{}) (tmpinfos []protomsg.Dbperson) { |
| | | var ok bool |
| | | |
| | |
| | | return tmpinfos |
| | | } |
| | | |
| | | //解析底库结构 |
| | | // 解析底库结构 |
| | | func Dbtablebyid(sources []map[string]interface{}) (tmpinfos []protomsg.Dbtable) { |
| | | var ok bool |
| | | |
| | |
| | | return allSource, nil |
| | | } |
| | | |
| | | func SourceDeduplication(buf [] byte) ([]map[string]interface{}, error) { |
| | | func SourceDeduplication(buf []byte) ([]map[string]interface{}, error) { |
| | | var info interface{} |
| | | json.Unmarshal(buf, &info) |
| | | out, ok := info.(map[string]interface{}) |
| | |
| | | return faceId, nil |
| | | } |
| | | |
| | | //解析聚合计数结构 |
| | | func SourceStatistics(buf [] byte) ([]map[string]interface{}, error) { |
| | | // 解析聚合计数结构 |
| | | func SourceStatistics(buf []byte) ([]map[string]interface{}, error) { |
| | | var info interface{} |
| | | json.Unmarshal(buf, &info) |
| | | out, ok := info.(map[string]interface{}) |
| | |
| | | return resultData, nil |
| | | } |
| | | |
| | | func SourceAggregations(buf [] byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) { |
| | | func SourceAggregations(buf []byte, thresholdTime float64, queryUseTime float64) (sources map[string]interface{}, err error) { |
| | | s := make(map[string]interface{}) |
| | | loc, err := time.LoadLocation("Asia/Shanghai") |
| | | if err != nil { |
| | |
| | | return s, nil |
| | | } |
| | | |
| | | func SourceAggregationsReturnByGrouped(buf [] byte, thresholdTime float64) (sources []map[string]interface{}, err error) { |
| | | func SourceAggregationsReturnByGrouped(buf []byte, thresholdTime float64) (sources []map[string]interface{}, err error) { |
| | | loc, err := time.LoadLocation("Asia/Shanghai") |
| | | if err != nil { |
| | | return nil, errors.New("时区设置错误") |
| | |
| | | return sources, nil |
| | | } |
| | | |
| | | //解析抓拍库人员结构 |
| | | // 解析抓拍库人员结构 |
| | | func PerSonAnalysis(preData []map[string]interface{}) (sources []map[string]interface{}, err error) { |
| | | loc, err := time.LoadLocation("Asia/Shanghai") |
| | | if err != nil { |
| | |
| | | return -1, errors.New("first total change error!") |
| | | } |
| | | |
| | | tmp,b := middle["total"].(map[string]interface{}) |
| | | tmp, b := middle["total"].(map[string]interface{}) |
| | | if b != true { |
| | | v := middle["total"].(float64) |
| | | t := int(v) |
| | | return t,nil |
| | | return t, nil |
| | | } |
| | | value := tmp["value"].(float64) |
| | | total = int(value) |
| | | return total, nil |
| | | } |
| | | |
| | | func SourceAggregationList(buf []byte) (sources []map[string]interface{}, err error) { |
| | | var info interface{} |
| | | json.Unmarshal(buf, &info) |
| | | out, ok := info.(map[string]interface{}) |
| | | if !ok { |
| | | return nil, errors.New("http response interface can not change map[string]interface{}") |
| | | } |
| | | |
| | | middle, ok := out["aggregations"].(map[string]interface{}) |
| | | if !ok { |
| | | return nil, errors.New("first hits change error!") |
| | | } |
| | | |
| | | documentAggregations := middle["group_by_documentnumber"].(map[string]interface{}) |
| | | buckets := documentAggregations["buckets"].([]interface{}) |
| | | if len(buckets) == 0 { |
| | | return nil, nil |
| | | } |
| | | for _, in := range buckets { |
| | | tmpbuf, ok := in.(map[string]interface{}) |
| | | if !ok { |
| | | return nil, errors.New("") |
| | | } |
| | | sources = append(sources, tmpbuf) |
| | | } |
| | | return sources, nil |
| | | } |
| | | |
| | | func EsReq(method string, url string, parama []byte) (buf []byte, err error) { |
| | |
| | | } |
| | | request, err := http.NewRequest(method, url, bytes.NewBuffer(parama)) |
| | | request.Header.Set("Content-type", "application/json") |
| | | request.Header.Set("Authorization",Token) |
| | | request.Header.Set("Authorization", Token) |
| | | |
| | | if err != nil { |
| | | fmt.Println("build request fail !") |
| | |
| | | // 赋值时检测是否能够赋值 |
| | | //func //Isnil(key string, ok bool){ |
| | | // if !ok { |
| | | // fmt.Println(key, "is nil can not asign") |
| | | // fmt.Println(key, "is nil can not asign") |
| | | // } |
| | | //} |
| | | |
| | | type account struct { |
| | | Username string `mapstructure: "username"` |
| | | Username string `mapstructure: "username"` |
| | | Userpassword string `mapstructure: "userpassword"` |
| | | } |
| | | |
| | | |
| | | var Account = &account{} |
| | | |
| | |
| | | v.AddConfigPath("/opt/vasystem/config/") |
| | | err := v.ReadInConfig() |
| | | if err != nil { |
| | | log.Fatal("err on parsing configuration file!",err) |
| | | log.Fatal("err on parsing configuration file!", err) |
| | | } |
| | | v.UnmarshalKey("es.account",Account) |
| | | v.UnmarshalKey("es.account", Account) |
| | | |
| | | Token = "Basic "+base64.StdEncoding.EncodeToString([]byte(Account.Username+":"+Account.Userpassword)) |
| | | Token = "Basic " + base64.StdEncoding.EncodeToString([]byte(Account.Username+":"+Account.Userpassword)) |
| | | } |