panlei
2019-10-30 f29ed4a8aece3175514c22c40dda79af6e093af4
修正事件推送
3个文件已修改
50 ■■■■ 已修改文件
labelFilter/req.go 47 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
labelFilter/ruleForLabel.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
labelFilter/req.go
@@ -1,6 +1,7 @@
package labelFilter
import (
    "basic.com/dbapi.git"
    "basic.com/valib/logger.git"
    "fmt"
    "github.com/golang/protobuf/proto"
@@ -13,6 +14,8 @@
    "time"
)
var sock mangos.Socket
var urlPool = make(map[string]chan structure.ResultMsg)
func Die(format string, v ...interface{}) {
    logger.Info("+++++++",format)
    //os.Exit(1)
@@ -52,30 +55,55 @@
    }
}
func Push(url string,data structure.ResultMsg) {
    var sock mangos.Socket
func Init(){
    var err error
    var msg []byte
    if sock, err = req.NewSocket(); err != nil {
        Die("创建请求socket失败: %s", err.Error())
    }
    errSize := sock.SetOption(mangos.OptionMaxRecvSize,30*1024*1024)
    if errSize != nil {
        fmt.Errorf("Failed set MaxRecvSize: %v", err)
        logger.Error("传输的数据超过大小限制")
        return
    }
    errTimeOut := sock.SetOption(mangos.OptionRecvDeadline,time.Millisecond * 2000)
    if errTimeOut != nil {
        fmt.Errorf("Failed set MaxRecvDeadline: %v", err)
        logger.Error("接收响应超时")
        return
    }
    //sock.AddTransport(ipc.NewTransport())
    var api dbapi.EventPushApi
    b, allRules := api.FindAllDetails()
    logger.Info("查看所有规则组:", allRules)
    if !b {
        logger.Error("查询时间推送规则失败!")
    }
    for _, ruleGroup := range allRules {
        if ruleGroup.Enable { // 大规则开关开启状态
            for _, url := range ruleGroup.Urls {
                // 为每个url建立一个chan
                urlPool[url.Url] = make(chan structure.ResultMsg,50)
                go GoPush(url.Url)
            }
        }
    }
}
func GoPush(url string) {
    var err error
    var msg []byte
    sock.AddTransport(tcp.NewTransport())
    if err = sock.Dial(url); err != nil {
        Die("请求socket拨号失败: %s", err.Error())
        logger.Error("请求socket拨号失败: %s", err.Error())
    }
    logger.Info("序列化数据")
    for {
        select {
        // case <-ctx.Done():
        //     return
        case data := <- urlPool[url]:
    bytes,err1 := proto.Marshal(data)
    logger.Info("数据长度为:",len(bytes))
    if err1 != nil {
@@ -93,8 +121,11 @@
        logger.Debug("事件推送成功!收到响应",string(msg))
    }
    sock.Close()
}
        default:
        }
    }
}
//func main() {
//    url := "tcp://192.168.1.123:40011"
//    Push(url,"hahahaha")
labelFilter/ruleForLabel.go
@@ -319,7 +319,7 @@
func pushData(urls []*protomsg.PushUrl, data structure.ResultMsg) {
    for _, url := range urls {
        logger.Debug("看看推送地址:",url.Url)
        //Push("tcp://"+url.Url, data)
    }
}
main.go
@@ -63,6 +63,7 @@
    go cache.Init(initchan, *dbIp, *surveyPort, *pubPort)
    logger.Info("cache init completed!!!", <-initchan) //dbserver初始化完毕
    ruleserver.Init()
    labelFilter.Init()
    go ruleserver.TimeTicker()
    go ruleserver.StartServer()
    nReciever("ipc:///tmp/sdk-2-rules-process.ipc", deliver.PushPull, 1)