panlei
2019-12-06 9d559b7d6c980cc1eb0eb3e58f6945a01633339a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package main
 
import (
 
    "flag"
    "sync"
    "net/http"
    _ "net/http/pprof"
    "plugin"
 
    //"github.com/spf13/viper"
    logger "github.com/alecthomas/log4go"
    "github.com/panjf2000/ants/v2"
 
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/deliver.git"
    "ruleprocess/insertdata"
    "ruleprocess/labelFilter"
    "ruleprocess/structure"
    "ruleprocess/cache"
    "ruleprocess/ruleserver"
)
 
var dbIp = flag.String("dbIp", "127.0.0.1", "dbserver ip")
var dbPort = flag.Int("dbPort", 8001, "default dbPort=8001")
var surveyPort = flag.Int("surveyPort", 40007, "survey port") //心跳
var pubPort = flag.Int("pubPort", 50007, "pubsub port")       //数据更新
var initchan = make(chan bool)
var env =  flag.String("env","dev","env set")
 
func init() {
    flag.Parse()
    // 日志初始化
 
    insertdata.Init(*env)
    //var logFile = "./logger/"
    //if viper.GetString("LogBasePath") != "" {
    //    logFile = viper.GetString("LogBasePath")
    //}
    //logFile = logFile + "ruleprocess.log"
    //fmt.Println("日志地址:",logFile)
    //logger.Config(logFile, logger.DebugLevel)
    //logger.SetSaveDays(7)
    // log4go
    logger.LoadConfiguration("./logger/log.xml")
    logger.Info("日志初始化成功!")
 
}
func main() {
    //fmt.Println("缓存初始化完成",<- initchan)//dbserver初始化完毕
    defer ants.Release()
    go func() {
        http.ListenAndServe("0.0.0.0:8899",nil)
    }()
    flag.Parse()
    wg := sync.WaitGroup{}
    wg.Add(3)
 
    go cache.Init(initchan, *dbIp, *surveyPort, *pubPort)
    logger.Info("cache init completed!!!", <-initchan) //dbserver初始化完毕
    ruleserver.Init()
    labelFilter.Init()
 
    go ruleserver.TimeTicker()
    go ruleserver.StartServer()
 
    nReciever("ipc:///tmp/sdk-2-rules-process.ipc", deliver.PushPull, 1)
    wg.Wait()
 
}
func nReciever(url string, m deliver.Mode, count int) {
    c := deliver.NewServer(m, url)
    nRecvImpl(c, 1)
}
 
func nRecvImpl(c deliver.Deliver, index int) {
    var msg []byte
    var wg1 sync.WaitGroup
    p,_ := ants.NewPool(20)
    syncCalculateSum := func() {
        Task(msg)
        wg1.Done()
    }
    wg1.Wait()
    var err error
    for {
        msg, err = c.Recv()
        if err == nil {
            wg1.Add(1)
            _ = p.Submit(syncCalculateSum)
            //go Task(msg)
        }
    }
}
 
func Task (msg []byte) {
    arg := structure.SdkDatas{}
    //start := time.Now()
    m := CallParamFormat(msg, &arg)
    // 进行规则处理判断(打上规则的标签)
    ruleserver.Judge(&arg, &m) // 把sdkMessage传进去,方便缓存数据时拼出一个resultMag
    // 把arg里的打的标签拿出来给m再封装一层
    resultMsg := structure.ResultMsg{SdkMessage: &m, RuleResult: arg.RuleResult}
    ruleserver.GetAttachInfo(resultMsg.SdkMessage)
    // 将打完标签的数据插入到ES
    insertdata.InsertToEs(resultMsg)
    //事件推送
    labelFilter.PushSomthing(resultMsg)
}
func CallParamFormat(msg []byte, args *structure.SdkDatas) protomsg.SdkMessage{
    //logger.Info("呼叫中间件格式化数据")
    p,err :=  plugin.Open("./algorithm/middleware.so")
    if err != nil {
        panic(err)
    }
    f,err1 := p.Lookup("ParamFormat")
    if err1 != nil {
        panic("没有找到中间件的格式化数据函数")
    }
    mess := f.(func(msg []byte, args *structure.SdkDatas)(protomsg.SdkMessage))(msg,args)
    return mess
}