panlei
2019-07-13 43876595ecbbf6c41fafc8293713347e8ed4014d
定时器缓存数据分类插入
4个文件已修改
220 ■■■■■ 已修改文件
insertdata/insertDataToEs.go 182 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruleserver/ruleToformula.go 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruleserver/timeTicker.go 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
insertdata/insertDataToEs.go
@@ -1,27 +1,30 @@
package insertdata
import (
    "ruleprocess/logger"
    "encoding/json"
    "errors"
    "fmt"
    "io/ioutil"
    "net"
    "ruleprocess/cache"
    "ruleprocess/logger"
    "strings"
    "time"
    "basic.com/pubsub/protomsg.git"
    "github.com/go-yaml/yaml"
    "github.com/golang/protobuf/proto"
    "github.com/satori/go.uuid"
    "ruleprocess/ruleserver"
    "ruleprocess/util"
    "github.com/go-yaml/yaml"
)
var weedfsUrl string
type conf struct {
    PhotoUrl string `yaml:"photoUrl"`
}
func init() {
    data, err := ioutil.ReadFile("./config/conf.yml")
    if err != nil {
@@ -105,7 +108,24 @@
        timeLabel = msg.RuleResult["timeLabel"].(string)
    }
    logger.Debug("插入数据前看看报警标志位:",timeLabel)
    if timeLabel == "10" {
    if timeLabel == "01" { // 无定时器状态要插入的报警数据
        InsertFace(msg)
        InsertYolo(msg)
    }
    if timeLabel == "10" { // 定时器状态要插入的首帧报警数据。连带着定时器开启时的那帧
        InsertFace(msg)
        InsertFace(msg.RuleResult["cacheData"].(ruleserver.ResultMsg))
        InsertYolo(msg)
        InsertYolo(msg.RuleResult["cacheData"].(ruleserver.ResultMsg))
    }
    if timeLabel == "12" { // 并非报警数据,只是状态改变的数据
        ChangeStatusFace(msg)
        ChangeStatusYolo(msg)
    }
}
// 往es中插入人脸数据
func InsertFace(msg ruleserver.ResultMsg) {
        if msg.RuleResult["face"] != nil && len(msg.RuleResult["face"].([]ruleserver.Arg)) > 0 {
            logger.Info("往ES插人脸数据")
            for _, face := range msg.RuleResult["face"].([]ruleserver.Arg) {
@@ -119,26 +139,18 @@
                err = proto.Unmarshal(bdata, &i)
                bigPhotoUrl := make(map[string]interface{})
                bigPhotoUrl, err = util.PostFormBufferData(weedfsUrl, i, uuid.NewV4().String())
                logger.Info(bigPhotoUrl)
                if len(face.Liker) == 0 {
            logger.Debug("========大图路径:", bigPhotoUrl)
                    // 人脸检测,没有相似的底库人员
                    localConfig, err := cache.GetServerInfo()
                    if err != nil {
                        logger.Error("查询本机信息失败!")
                    }
                    serverIp, err := GetLocalIP()
                    // 解压缩并上传图片
                    bdata, err := util.UnCompress(msg.Data)
                    if err != nil {
                        panic("解压缩图片时出现错误")
                    }
                    // 查询cameraName
                    camera, err := cache.GetCameraById(msg.Cid)
                    if err != nil {
                        logger.Error("查询摄像机信息失败")
                    }
                    i := protomsg.Image{}
                    err = proto.Unmarshal(bdata, &i)
                    bytes := util.SubImg(i, int(face.Location.X), int(face.Location.Y), int(face.Location.X+face.Location.Width), int(face.Location.Y+face.Location.Height))
                    resp, err := util.PostFormBufferData1(weedfsUrl, bytes, uuid.NewV4().String())
                    if err != nil {
@@ -183,55 +195,46 @@
                        0,
                        0,
                        0,
                        []*protomsg.Baseinfo{},
                face.Liker,
                    }
                    requstbody, err := json.Marshal(pervideo)
                    if err != nil {
                        logger.Info("json parse error ", err)
                        return
                    }
                    err1 := EsReq("POST", "http://192.168.1.182:9200/videopersons/perVideoPicture", requstbody)
                    if err1 != nil {
                        logger.Error("上传ES出错!---",err1)
                    }
                    //logger.Info(err.Error())
                } else {
                    // 人脸比对
                    logger.Warn("___________________________________________这是有baseinfo的")
                    localConfig, err := cache.GetServerInfo()
                    if err != nil {
                        logger.Info("查询本机信息失败!")
                    }
                    serverIp, err := GetLocalIP()
    }
}
func ChangeStatusFace(msg ruleserver.ResultMsg) {
    logger.Info("往ES插非报警但是状态转换数据")
        // 上传大图
                    // 解压缩并上传图片
                    bdata, err := util.UnCompress(msg.Data)
                    if err != nil {
                        panic("解压缩图片时出现错误")
                    }
        i := protomsg.Image{}
        err = proto.Unmarshal(bdata, &i)
        bigPhotoUrl := make(map[string]interface{})
        bigPhotoUrl, err = util.PostFormBufferData(weedfsUrl, i, uuid.NewV4().String())
        logger.Debug("========大图路径:", bigPhotoUrl)
        // 人脸检测,没有相似的底库人员
        localConfig, err := cache.GetServerInfo()
        if err != nil {
            logger.Error("查询本机信息失败!")
        }
        serverIp, err := GetLocalIP()
                    // 查询cameraName
                    camera, err := cache.GetCameraById(msg.Cid)
                    if err != nil {
                        logger.Error("查询摄像机信息失败")
                    }
                    i := protomsg.Image{}
                    err = proto.Unmarshal(bdata, &i)
                    //logger.Info("-------------------------------------------看下宽和高", i.Width, i.Height)
                    bytes := util.SubImg(i, int(face.Location.X), int(face.Location.Y), int(face.Location.X+face.Location.Width), int(face.Location.Y+face.Location.Height))
                    resp, err := util.PostFormBufferData1(weedfsUrl, bytes, uuid.NewV4().String())
                    if err != nil {
                        logger.Error("上传小图出错")
                    }
                    logger.Info("================小图地址:",resp["fileUrl"].(string))
                    sex := ""
                    if face.ThftRes.Gender == 1 {
                        sex = "男"
                    } else {
                        sex = "女"
                    }
                    race := getRaceString(face.ThftRes.Race)
                    ageDescription := getDescription(face.ThftRes.Age)
                    pervideo := PerVideoPicture{
                        uuid.NewV4().String(),
                        msg.Cid,
@@ -241,41 +244,42 @@
                        msg.Tasklab.Taskid,
                        msg.Tasklab.Taskname,
                        "人脸",
                        "",
            "状态转换数据,非报警数据",
                        time.Now().Format("2006-01-02 15:04:05"), // 只检测,没有比对时间
                        sex,
                        face.ThftRes.Age,
                        ageDescription,
                        race,
                        face.ThftRes.Smile,
                        face.ThftRes.Beauty,
                        "",
                        []string{strings.Split(resp["fileUrl"].(string), "/")[1]},
            0,
            "",
            "",
            0,
            0,
            "",
            []string{""},
                        "暂无集群",
                        localConfig.ServerId,
                        localConfig.ServerName,
                        serverIp,
                        "",
                        face.Score,
            0,
                        1,
                        0,
                        0,
                        0,
                        face.Liker,
            []*protomsg.Baseinfo{},
                    }
                    requstbody, err := json.Marshal(pervideo)
                    if err != nil {
                        logger.Error("json parse error ", err)
            logger.Info("json parse error ", err)
                        return
        }
        err1 := EsReq("POST", "http://192.168.1.182:9200/videopersons/perVideoPicture", requstbody)
        if err1 != nil {
            logger.Error("上传ES出错!---", err1)
        }
}
                    }
                    err = EsReq("POST", "http://192.168.1.182:9200/videopersons/perVideoPicture", requstbody)
                    logger.Info("------------------------------------------哈哈哈哈,底库有人")
                }
            }
        }
// 往es中插入yolo数据
func InsertYolo(msg ruleserver.ResultMsg) {
        if msg.RuleResult["yolo"] != nil && len(msg.RuleResult["yolo"].([]ruleserver.Result)) > 0 {
            logger.Info("往ES插yolo数据")
            var sdkNames string = ""
@@ -358,8 +362,72 @@
            }
        }
    }
func ChangeStatusYolo(msg ruleserver.ResultMsg) {
    logger.Info("往ES插yolo非报警状态改变数据")
    var sdkNames string = ""
    alarmRules := []AlarmRule{}
    bdata, err := util.UnCompress(msg.Data)
    if err != nil {
        panic("解压缩图片时出现错误")
}
    i := protomsg.Image{}
    err = proto.Unmarshal(bdata, &i)
    //resp, err = util.PostFormBufferData(weedfsUrl, i, uuid.NewV4().String())
    resp, err := util.DrawPolygonOnImage(msg.Cid, i, msg.RuleResult["yolo"].([]ruleserver.Result))
    if err != nil {
        logger.Error("画框或上传图片服务器出错", err)
    } else {
        logger.Info("上传的图片信息:", resp)
    }
    // logger.Println("图片上传返回值:", resp)
    // 查询本机信息
    localConfig, err := cache.GetServerInfo()
    if err != nil {
        logger.Error("查询本机信息失败!")
    }
    // 查询cameraName
    camera, err := cache.GetCameraById(msg.Cid)
    if err != nil {
        logger.Error("查询摄像机信息失败")
    }
    serverIp, err := GetLocalIP()
    peraction := Personaction{
        uuid.NewV4().String(),
        msg.Cid,
        camera.Name,
        camera.Addr,
        msg.Tasklab.Taskid,
        msg.Tasklab.Taskname,
        sdkNames,
        "yolo非报警状态改变数据",
        alarmRules,
        localConfig.ServerId,
        localConfig.ServerName,
        serverIp,
        "",
        []string{strings.Split(resp["fileUrl"].(string), "/")[1]},
        time.Now().Format("2006-01-02 15:04:05"),
        "",
        0,
        0,
        0,
        0,
    }
    requstbody, err := json.Marshal(peraction)
    if err != nil {
        logger.Info("json parse error ", err)
        return
    }
    err = EsReq("POST", "http://192.168.1.182:9200/personaction/perVideoAction", requstbody)
    if err != nil {
        logger.Error("往ES插入数据失败", err)
    } else {
        logger.Warn("__________________________________________往ES插入yolo数据成功")
        //os.Exit(1)
    }
}
// 获取本机ip
func GetLocalIP() (ipv4 string, err error) {
    var (
main.go
@@ -69,9 +69,9 @@
                arg := ruleserver.SdkDatas{}
                m := paramFormat(msg, &arg)
                //logger.Info("解析出来的数据:", arg)
                ruleserver.Judge(&arg)
                ruleserver.Judge(&arg,&m) // 把sdkMessage传进去,方便缓存数据时拼出一个resultMag
                // 把arg里的打的标签拿出来给m再封装一层
                resultMag := ruleserver.ResultMsg{SdkMessage: m, RuleResult: arg.RuleResult}
                resultMag := ruleserver.ResultMsg{SdkMessage: &m, RuleResult: arg.RuleResult}
                //logger.Info("打完标签后的结果:",resultMag)
                // 将打完标签的数据插入到ES
ruleserver/ruleToformula.go
@@ -129,7 +129,7 @@
//protomsg.SdkMessage.TaskLabel.SdkmsgWithTask.sdkdata
type ResultMsg struct {
    protomsg.SdkMessage
    *protomsg.SdkMessage
    RuleResult map[string]interface{} // 过完规则后打的标签
}
@@ -186,7 +186,7 @@
}
// 对单帧图像的判断 thisSdkDatas  当前传入的这帧数据,cacheSdkData 定时器里缓存的一帧数据 没有就返回nil  (thisSdkDatas SdkDatas, cacheSdkDatas SdkDatas)
func Judge(args *SdkDatas) {
func Judge(args *SdkDatas,message *protomsg.SdkMessage) {
    if len(args.Sdkdata) > 0 {
        // 拿到本摄像机的区域
        cameraPolygons := GetPolygons(args.CameraId)
@@ -212,10 +212,10 @@
                    if len(temp) > 0 {
                        if ruleList[i].SetType == "linkTask" {
                            // groupId中含有link则为联动任务
                            LinkTask(args, ruleList[i], taskId)
                            LinkTask(args, ruleList[i], taskId,message)
                        } else {
                            // 独立任务的处理
                            RunRule(args, ruleList[i], taskId)
                            RunRule(args, ruleList[i], taskId,message)
                        }
                    }
                }
@@ -306,7 +306,7 @@
        }
    }
}
func RunRule(args *SdkDatas, groupRule *protomsg.GroupRule, taskId string) bool {
func RunRule(args *SdkDatas, groupRule *protomsg.GroupRule, taskId string,message *protomsg.SdkMessage) bool {
    defer func() {
        if err := recover(); err != nil {
            logger.Error("比对规则有误", err.(string))
@@ -417,7 +417,7 @@
            if ipcId == sdkData.IpcId {
                for _, areaMap := range sdkData.AreaMapList {
                    // 去开启一个定时器
                    duration(groupRule.Rules[j], groupRule.GroupId, areaMap, args)
                    duration(groupRule.Rules[j], groupRule.GroupId, areaMap, args,message)
                }
            }
        }
@@ -533,10 +533,14 @@
        } else { // 结果为假
            for k, timeEle := range TimeEleList {
                if strings.Contains(k, groupId) {
                    logger.Debug("------------------------------重置定时器,此时的计数器的值为",timeEle.N)
                    timeEle.N = timeEle.InitN // 重置定时器
                    timeEle.CacheSdkData = SdkDatas{} // 把缓存数据扔了
                    if timeEle.AlarmFlag {
                        logger.Debug("------------------------------杀死定时器,报警此帧状态改变的数据,此时的计数器的值为",timeEle.N)
                    flagTime = "12"
                        args.RuleResult["timeLabel"] = flagTime
                        delete(TimeEleList,k)
                    } else {
                        delete(TimeEleList,k)
                    }
                }
            }
@@ -544,6 +548,7 @@
    } else { // 无定时器
        if result {
            flagTime = "01"
            args.RuleResult["timeLabel"] = flagTime
        } else {
            flagTime = "00"
        }
@@ -628,7 +633,7 @@
}
// 联动任务的处理
func LinkTask(args *SdkDatas, groupRule *protomsg.GroupRule, taskId string) {
func LinkTask(args *SdkDatas, groupRule *protomsg.GroupRule, taskId string,message *protomsg.SdkMessage) {
    // new一个定时器,如果以此groupId为标志的定时器不存在的话
    logger.Info("------------------------------------------当前是联动任务,规则是:", groupRule.GroupText)
    var flag bool = true
@@ -658,7 +663,7 @@
        }
    }
    // 往数组里赋值
    isOk := RunRule(args, groupRule, taskId)
    isOk := RunRule(args, groupRule, taskId,message)
    if isOk {
        logger.Info("这帧图像在任务下的一整条规则下(联动任务下就是跟本摄像机像相关的小规则)的判断结果为true")
        // 根据cameraId去更新或者插入结果,然后判断是否数组是否可以得出报警的结论
@@ -854,7 +859,7 @@
}
// 如果有持续时间条件维护开启一个定时器
func duration(rule *protomsg.Rule, groupId string, am *AreaMap, args *SdkDatas) {
func duration(rule *protomsg.Rule, groupId string, am *AreaMap, args *SdkDatas,message *protomsg.SdkMessage) {
    if rule.PolygonId == am.areaId { // 首先规则所对应的区域id要跟区域数据的id对的上  配置的算法要对的上
        if rule.SdkArgAlias == "duration" { //
            // 先看看定时器元素队列中是否有这个摄像机这个区域的定时器,如果有就不能再次创建了
@@ -868,7 +873,7 @@
            if flag {
                timeLength, _ := strconv.Atoi(rule.SdkArgValue)
                timeEle := TimeElement{N: timeLength, InitN: timeLength, AlarmFlag: false, CacheSdkData: *args} // 扔进去一个定时器元素(并缓存当前画面帧数据)
                timeEle := TimeElement{N: timeLength, InitN: timeLength, AlarmFlag: false, CacheSdkData: &ResultMsg{message,args.RuleResult}} // 扔进去一个定时器元素(并缓存当前画面帧数据)
                //TimeEleList = make(map[string]timeElement)
                TimeEleList[groupId+" "+rule.Id] = &timeEle // 定时器元素以摄像机id拼接区域id为键
                logger.Info("创建了计数器并且计数器集合为:", TimeEleList)
ruleserver/timeTicker.go
@@ -17,7 +17,7 @@
    InitN        int      // 赋值后就不变的初始值
    BufferFlag     int      // 缓冲容错位 连续帧false才为false
    AlarmFlag     bool      // 报警标志位 定时器开启后第一次报警时会被置为true 往后再来报警也不会插进ES
    CacheSdkData SdkDatas // 定时器的缓存数据 持续时间类的开启定时器时要缓存一帧
    CacheSdkData *ResultMsg // 定时器的缓存数据 持续时间类的开启定时器时要缓存一帧
    GroupId      string   // 联动规则需要记录下此时的规则组id
    RuleResults  []*RuleResult
}
@@ -65,7 +65,6 @@
    stopChan <- true
    TimeTicker()
}
// 结构体根据某字段排序
type SubList []*RuleResult