---
panlei
2019-11-18 bd010e089c31b2dad10e93214ae39e21621aa09f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main
 
import (
    "basic.com/dbapi.git"
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/deliver.git"
    "net/http"
    _ "net/http/pprof"
    "plugin"
    "ruleprocess/insertdata"
    "ruleprocess/labelFilter"
    "ruleprocess/structure"
    "time"
 
    "basic.com/valib/logger.git"
    "flag"
    "fmt"
    "github.com/spf13/viper"
    "ruleprocess/cache"
    "ruleprocess/ruleserver"
    "sync"
)
 
var dbIp = flag.String("dbIp", "127.0.0.1", "dbserver ip")
var dbPort = flag.Int("dbPort", 8001, "default dbPort=8001")
var surveyPort = flag.Int("surveyPort", 40007, "survey port") //心跳
var pubPort = flag.Int("pubPort", 50007, "pubsub port")       //数据更新
var initchan = make(chan bool)
var env =  flag.String("env","pro","env set")
 
func init() {
    flag.Parse()
    // 日志初始化
 
    insertdata.Init(*env)
    var logFile = "./logger/"
    if viper.GetString("LogBasePath") != "" {
        logFile = viper.GetString("LogBasePath")
    }
    logFile = logFile + "ruleprocess.log"
    fmt.Println("日志地址:",logFile)
    logger.Config(logFile, logger.DebugLevel)
    logger.SetSaveDays(7)
    logger.Info("日志初始化成功!")
}
func main() {
    //fmt.Println("缓存初始化完成",<- initchan)//dbserver初始化完毕
    go func() {
        http.ListenAndServe("0.0.0.0:8899",nil)
    }()
    flag.Parse()
    wg := sync.WaitGroup{}
    wg.Add(3)
 
    dbapi.Init(*dbIp, *dbPort)
    go cache.Init(initchan, *dbIp, *surveyPort, *pubPort)
    logger.Info("cache init completed!!!", <-initchan) //dbserver初始化完毕
    ruleserver.Init()
    labelFilter.Init()
    go ruleserver.TimeTicker()
    go ruleserver.StartServer()
    nReciever("ipc:///tmp/sdk-2-rules-process.ipc", deliver.PushPull, 1)
    wg.Wait()
}
func nReciever(url string, m deliver.Mode, count int) {
    c := deliver.NewServer(m, url)
    nRecvImpl(c, 1)
}
 
func nRecvImpl(c deliver.Deliver, index int) {
 
    var msg []byte
    var err error
    //msgChan := make(chan []byte,100)
    for {
        select {
        // case <-ctx.Done():
        //     return
        default:
            msg, err = c.Recv()
            //msgChan <- msg
            if err != nil {
                logger.Info("recv error : ", err)
                fmt.Println("recv error : ", err)
                continue
            } else {
                //runtime.GOMAXPROCS(runtime.NumCPU())
                //logger.Debug("使用的cpu个数:",runtime.NumCPU())
                //go func(msg []byte) {
                    logger.Debug("当前时间戳:", time.Now().Unix())
                    arg := structure.SdkDatas{}
                    //paramFormat(msg, &arg)
                    start := time.Now()
                    m := CallParamFormat(msg, &arg)
                    // 进行规则处理判断(打上规则的标签)
                    ruleserver.Judge(&arg, &m) // 把sdkMessage传进去,方便缓存数据时拼出一个resultMag
                    // 把arg里的打的标签拿出来给m再封装一层
                    resultMsg := structure.ResultMsg{SdkMessage: &m, RuleResult: arg.RuleResult}
                    ruleserver.GetAttachInfo(resultMsg.SdkMessage)
                    ruleEnd := time.Since(start)
                    logger.Debug("规则判断完所用时间:", ruleEnd)
                    // 将打完标签的数据插入到ES
                    insertdata.InsertToEs(resultMsg)
                    esEnd := time.Since(start)
                    logger.Debug("插入完Es所用时间:", esEnd)
                    //事件推送
                    labelFilter.PushSomthing(resultMsg)
                //}(msg)
            }
        }
    }
}
 
func CallParamFormat(msg []byte, args *structure.SdkDatas) protomsg.SdkMessage{
    p,err :=  plugin.Open("./algorithm/middleware.so")
    if err != nil {
        panic(err)
    }
    f,err1 := p.Lookup("ParamFormat")
    if err1 != nil {
        panic("没有找到中间件的格式化数据函数")
    }
    mess := f.(func(msg []byte, args *structure.SdkDatas)(protomsg.SdkMessage))(msg,args)
    return mess
}