panlei
2019-08-05 dfcc46bc906e3e5040c74c2d0281f5b0c5d9987b
标签过滤器
4个文件已修改
36 ■■■■■ 已修改文件
labelFilter/readyDataForLabel.go 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
labelFilter/req.go 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
labelFilter/ruleForLabel.go 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
labelFilter/readyDataForLabel.go
@@ -32,7 +32,6 @@
    for _,yoloGroup := range result.RuleResult["yolo"].([]ruleserver.Result) {
        label.DefenceLevel = append(label.DefenceLevel,yoloGroup.AlarmLevel)
    }
    Judge(label,result)
}
// 从缓存中查出所有的规则数据
labelFilter/req.go
@@ -15,7 +15,6 @@
)
func Die(format string, v ...interface{}) {
    fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
    logger.Info("+++++++",format)
    os.Exit(1)
}
@@ -23,11 +22,12 @@
func date() string {
    return time.Now().Format(time.ANSIC)
}
func Node0(url string) {
var msgChan chan []byte
func Receive(url string) {
    var sock mangos.Socket
    var err error
    var msg []byte
    msgChan = make(chan []byte,200)
    if sock, err = rep.NewSocket(); err != nil {
        Die("can't get new rep socket: %s", err)
    }
@@ -39,11 +39,13 @@
    for {
        // Could also use sock.RecvMsg to get header
        msg, err = sock.Recv()
        if string(msg) == "DATE" { // no need to terminate
            fmt.Println("NODE0: RECEIVED DATE REQUEST")
        if msg != nil { // no need to terminate
            fmt.Println("Received Data request")
            // 把收到的msg塞进通道
            msgChan <- msg
            // 给发送程序反馈信息
            d := date()
            fmt.Printf("NODE0: SENDING DATE %s\n", d)
            err = sock.Send([]byte(d))
            err = sock.Send([]byte("Received Data, --"+d))
            if err != nil {
                Die("can't send reply: %s", err.Error())
            }
labelFilter/ruleForLabel.go
@@ -4,13 +4,17 @@
    "basic.com/dbapi.git"
    "basic.com/pubsub/protomsg.git"
    "github.com/knetic/govaluate"
    "os"
    "ruleprocess/logger"
    "ruleprocess/ruleserver"
    "strconv"
    "time"
)
func Judge(label *Label, result ruleserver.ResultMsg){
func Judge(result ruleserver.ResultMsg){
    // 装配成自己可以识别的数据
    label := new(Label)
    label.DataFormatToLabel(result)
    //拿到所有规则组
    var api dbapi.EventPushApi
    b,allRules := api.FindAllDetails()
@@ -63,8 +67,10 @@
                    logger.Info("通过规则,表达式为:",result)
                    // 推送服务器
                    pushData(ruleGroup.Urls,result.(ruleserver.ResultMsg))
                    os.Exit(1)
                } else {
                    logger.Info("没通过规则,表达式为:",result)
                    os.Exit(1)
                }
            }
        }
@@ -191,5 +197,7 @@
}
// 调用目标服务器的插入接口
func pushData (urls []*protomsg.PushUrl, data ruleserver.ResultMsg){
    for _,url := range urls {
        Push(url.Url,data)
    }
}
main.go
@@ -74,10 +74,11 @@
                logger.Info("解析出来的数据:", arg)
                ruleserver.Judge(&arg,&m) // 把sdkMessage传进去,方便缓存数据时拼出一个resultMag
                // 把arg里的打的标签拿出来给m再封装一层
                resultMag := ruleserver.ResultMsg{SdkMessage: &m, RuleResult: arg.RuleResult}
                //logger.Info("打完标签后的结果:",resultMag)
                resultMsg := ruleserver.ResultMsg{SdkMessage: &m, RuleResult: arg.RuleResult}
                // 将打完标签的数据插入到ES
                insertdata.InsertToEs(resultMag)
                insertdata.InsertToEs(resultMsg)
                //事件推送
            }
        }
    }