package db import ( "basic.com/pubsub/esutil.git" "basic.com/valib/logger.git" "encoding/json" "fmt" "ruleModelEngine/config" "ruleModelEngine/util" "strconv" "strings" ) func decodeDocumentInfos(docInfo []map[string]interface{}) ([]CaptureInfo, error) { captureInfos := make([]CaptureInfo, 0) for _, info := range docInfo { var captureInfo = CaptureInfo{} captureInfo.DocumentNumber = info["key"].(string) buckets := info["top_hits"].(map[string]interface{})["hits"].(map[string]interface{})["hits"].([]interface{}) for _, sourceInfo := range buckets { rSourceInfo := sourceInfo.(map[string]interface{}) source := rSourceInfo["_source"].(map[string]interface{}) picDate := source["picDate"].(string) cameraLocation := source["cameraLocation"].(map[string]interface{}) unit := cameraLocation["unit"].(string) floor := cameraLocation["floor"].(string) building := cameraLocation["building"].(string) direction := cameraLocation["direction"].(string) captureAddress := building + unit + floor captureInfo.CaptureDetail = append(captureInfo.CaptureDetail, CaptureDetail{CaptureAddress: captureAddress, CaptureDate: picDate, Direction: direction}) } captureInfos = append(captureInfos, captureInfo) } return captureInfos, nil } func QueryTimesByDocNumberDays(docNumber string, communityId string, cameraIds []string, days int) (int, error) { cameraIdsStr := strings.Replace(strings.Trim(fmt.Sprint(cameraIds), "[]"), " ", "\",\"", -1) esURL := "http://" + config.Elastic.Host + ":" + config.Elastic.Port + "/" + config.Elastic.Index + "/_search" queryDSL := `{ "query": { "bool": { "filter": [ { "range": { "picDate": { "gte": "now-` + strconv.Itoa(days) + `d/d" } } }, { "term": { "documentNumber": "` + docNumber + `" } }, { "term": { "communityId": "` + communityId + `" } }, { "terms": { "cameraId": ["` + cameraIdsStr + `"] } } ] } }, "size": 0 }` buf, err := esutil.EsReq("POST", esURL, []byte(queryDSL)) if err != nil { return 0, err } total, err := esutil.SourceTotal(buf) if err != nil { return 0, err } return total, nil } func QueryLastIdByDayRange(communityId string, cameraIds []string, dayGte, dayLt int) (map[string]string, error) { cameraIdsStr := strings.Replace(strings.Trim(fmt.Sprint(cameraIds), "[]"), " ", "\",\"", -1) esURL := "http://" + config.Elastic.Host + ":" + config.Elastic.Port + "/" + config.Elastic.Index + "/_search" queryDSL := `{ "query": { "bool": { "filter": [ { "range": { "picDate": { "gte": "now-` + strconv.Itoa(dayGte) + `d/d", "lt": "now-` + strconv.Itoa(dayLt) + `d/d" } } }, { "term": { "communityId": "` + communityId + `" } }, { "terms": { "cameraId": ["` + cameraIdsStr + `"] } } ] } }, "size": 0, "aggs": { "group_by_documentnumber": { "terms": { "field": "documentNumber", "size": 100000 }, "aggs": { "top_hits": { "top_hits": { "_source": [ "documentNumber", "id", "alarmRules.ruleId" ], "size": 1, "sort": [ { "picDate": { "order": "desc" } } ] } } } } } }` //fmt.Println(esURL) //fmt.Println(queryDSL) docNumberMap := make(map[string]string) buf, err := esutil.EsReq("POST", esURL, []byte(queryDSL)) if err != nil { return nil, err } source, err := esutil.SourceAggregationList(buf) if err != nil { return nil, err } //fmt.Println("source", source) for _, info := range source { buckets := info["top_hits"].(map[string]interface{})["hits"].(map[string]interface{})["hits"].([]interface{}) for _, sourceInfo := range buckets { rSourceInfo := sourceInfo.(map[string]interface{}) rSource := rSourceInfo["_source"].(map[string]interface{}) //如果该天最后一条已经预警过进出异常,将过滤掉不再预警 alarmFlag := false if rSource["alarmRules"] != nil { alarmRules := rSource["alarmRules"].([]interface{}) for _, alarmRule := range alarmRules { ruleId := alarmRule.(map[string]interface{})["ruleId"].(string) //fmt.Println("ruleId", ruleId,rSource["documentNumber"].(string),rSource["id"].(string)) if ruleId == "4" { alarmFlag = true break } } } if alarmFlag == true { continue } documentNumber := rSource["documentNumber"].(string) id := rSource["id"].(string) docNumberMap[documentNumber] = id } } return docNumberMap, nil } func QueryById(id string) (TaskPerson, error) { esURL := "http://" + config.Elastic.Host + ":" + config.Elastic.Port + "/" + config.Elastic.Index + "/_search" queryDSL := `{ "query": { "bool": { "filter": [ { "term": { "id": "` + id + `" } } ] } }, "_source": [ "id", "cameraId", "picDate", "communityId", "documentNumber", "cameraLocation.floor", "targetInfo.targetType" ] }` var taskPerson TaskPerson buf, err := esutil.EsReq("POST", esURL, []byte(queryDSL)) if err != nil { return taskPerson, err } source, err := esutil.Sourcelist(buf) if err != nil { return taskPerson, err } if len(source) == 0 { return taskPerson, nil } //fmt.Println(source[0]) taskPerson.Id = source[0]["id"].(string) taskPerson.PicDate = source[0]["picDate"].(string) taskPerson.CameraId = source[0]["cameraId"].(string) taskPerson.CommunityId = source[0]["communityId"].(string) taskPerson.DocumentNumber = source[0]["documentNumber"].(string) taskPerson.TargetType = source[0]["targetInfo"].(map[string]interface{})["targetType"].(string) taskPerson.Floor = source[0]["cameraLocation"].(map[string]interface{})["floor"].(string) return taskPerson, nil } func QueryCheckDataByDocumentNumber(documentNumber string, startTime, endTime string) (int, error) { esURL := "http://" + config.Elastic.Host + ":" + config.Elastic.Port + "/" + config.Elastic.Index + "/_search" queryDSL := `{ "query": { "bool": { "filter": [ { "term": { "documentNumber": "` + documentNumber + `" } }, { "range": { "picDate": { "gte": "` + startTime + `", "lt" : "` + endTime + `" } } } ], "must_not": [ { "term": { "floor": "" } } ] } }, "size": 100, "_source": [ "cameraLocation.floor", "cameraLocation.building" ] }` //fmt.Println("queryDSL: ", queryDSL) //var floor = buf, err := esutil.EsReq("POST", esURL, []byte(queryDSL)) if err != nil { return 0, err } source, err := esutil.Sourcelist(buf) if err != nil { return 0, err } if len(source) == 0 { return 0, nil } buildingFloors := make(map[string]bool) for _, info := range source { building := info["cameraLocation"].(map[string]interface{})["building"].(string) floor := info["cameraLocation"].(map[string]interface{})["floor"].(string) if building != "" && floor != "" { buildingFloors[building+floor] = true } } //fmt.Println("buildingFloors: ", buildingFloors, len(buildingFloors)) return len(buildingFloors), nil } func AddAlarmRules(alarmRules []AlarmRule, id string) (bool, error) { if len(alarmRules) == 0 { logger.Info("未有预警 id: ", id) return true, nil } logger.Info("预警 id: ", id) logger.Info("预警 AlarmRule: ", alarmRules) esURL := "http://" + config.Elastic.Host + ":" + config.Elastic.Port + "/" + config.Elastic.Index + "/_update_by_query" newAlarmRulesJson, err := json.Marshal(alarmRules) if err != nil { logger.Error("json.Marshal alarmRules err: ", newAlarmRulesJson) return false, err } queryDSL := `{ "script": { "source": "for (item in params.newAlarmRules) { ctx._source.alarmRules.add(item) } ctx._source.isAlarm = params.newStatus", "lang": "painless", "params": { "newStatus":true, "newAlarmRules": ` + string(newAlarmRulesJson) + ` } }, "query": { "bool": { "filter": [ { "term": { "id": "` + id + `" } } ] } } }` //fmt.Println("AddAlarmRules: ", esURL) //fmt.Println("AddAlarmRules: ", queryDSL) //return true, nil buf, err := esutil.EsReq("POST", esURL, []byte(queryDSL)) if err != nil { logger.Error("EsReq err: ", err) return false, err } addFlag, err := esutil.SourceUpdated(buf) if err != nil { logger.Error("SourceUpdated err: ", err) return false, err } if addFlag != 0 { return true, nil } return false, nil } func Query1MDataByCommunityId(communityId string, documentNumber []string, days int) ([]CaptureInfo, error) { //fmt.Println(config.Elastic.DocumentSize) //fmt.Println(config.Elastic.TopHitsSize) documentNumberStr := strings.Replace(strings.Trim(fmt.Sprint(documentNumber), "[]"), " ", "\",\"", -1) esURL := "http://" + config.Elastic.Host + ":" + config.Elastic.Port + "/" + config.Elastic.Index + "/_search" queryDSL := ` { "query": { "bool": { "filter": [ { "range": { "picDate": { "gte": "now-` + strconv.Itoa(days) + `d/d", "lt": "now/d" } } }, { "term":{ "communityId":"` + communityId + `" } }, { "terms":{ "documentNumber":["` + documentNumberStr + `"] } } ] } }, "size": 0, "aggs": { "group_by_documentnumber": { "terms": { "field": "documentNumber", "size": ` + strconv.Itoa(config.Elastic.DocumentSize) + ` }, "aggs": { "top_hits": { "top_hits": { "_source": [ "documentNumber", "picDate", "cameraLocation.building", "cameraLocation.unit", "cameraLocation.floor", "cameraLocation.direction" ], "size": ` + strconv.Itoa(config.Elastic.TopHitsSize) + `, "sort": [ { "picDate": { "order": "asc" } } ] } } } } } }` //fmt.Println(esURL) //fmt.Println(queryDSL) buf, err := esutil.EsReq("POST", esURL, []byte(queryDSL)) if err != nil { return nil, err } source, err := util.SourceAggregationList(buf) if err != nil { return nil, err } result, _ := decodeDocumentInfos(source) return result, nil }