panlei
2019-10-30 f29ed4a8aece3175514c22c40dda79af6e093af4
修正事件推送
3个文件已修改
86 ■■■■■ 已修改文件
labelFilter/req.go 83 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
labelFilter/ruleForLabel.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
labelFilter/req.go
@@ -1,6 +1,7 @@
package labelFilter
import (
    "basic.com/dbapi.git"
    "basic.com/valib/logger.git"
    "fmt"
    "github.com/golang/protobuf/proto"
@@ -13,6 +14,8 @@
    "time"
)
var sock mangos.Socket
var urlPool = make(map[string]chan structure.ResultMsg)
func Die(format string, v ...interface{}) {
    logger.Info("+++++++",format)
    //os.Exit(1)
@@ -52,49 +55,77 @@
    }
}
func Push(url string,data structure.ResultMsg) {
    var sock mangos.Socket
func Init(){
    var err error
    var msg []byte
    if sock, err = req.NewSocket(); err != nil {
        Die("创建请求socket失败: %s", err.Error())
    }
    errSize := sock.SetOption(mangos.OptionMaxRecvSize,30*1024*1024)
    if errSize != nil {
        fmt.Errorf("Failed set MaxRecvSize: %v", err)
        logger.Error("传输的数据超过大小限制")
        return
    }
    errTimeOut := sock.SetOption(mangos.OptionRecvDeadline,time.Millisecond * 2000)
    if errTimeOut != nil {
        fmt.Errorf("Failed set MaxRecvDeadline: %v", err)
        logger.Error("接收响应超时")
        return
    }
    //sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    if err = sock.Dial(url); err != nil {
        Die("请求socket拨号失败: %s", err.Error())
    var api dbapi.EventPushApi
    b, allRules := api.FindAllDetails()
    logger.Info("查看所有规则组:", allRules)
    if !b {
        logger.Error("查询时间推送规则失败!")
    }
    logger.Info("序列化数据")
    bytes,err1 := proto.Marshal(data)
    logger.Info("数据长度为:",len(bytes))
    if err1 != nil {
        logger.Info("序列化失败:",err1)
    for _, ruleGroup := range allRules {
        if ruleGroup.Enable { // 大规则开关开启状态
            for _, url := range ruleGroup.Urls {
                // 为每个url建立一个chan
                urlPool[url.Url] = make(chan structure.ResultMsg,50)
                go GoPush(url.Url)
            }
        }
    }
    logger.Debug("推送数据")
    //bytes := []byte("ndfasojdfaidsos")
    if err = sock.Send(bytes); err != nil {
        Die("推送socket发送数据失败: %s", err.Error())
    }
    msg, err = sock.Recv();
    if err != nil {
        Die("接收响应失败: %s", err.Error())
    } else {
        logger.Debug("事件推送成功!收到响应",string(msg))
    }
    sock.Close()
}
func GoPush(url string) {
    var err error
    var msg []byte
    sock.AddTransport(tcp.NewTransport())
    if err = sock.Dial(url); err != nil {
        logger.Error("请求socket拨号失败: %s", err.Error())
    }
    logger.Info("序列化数据")
    for {
        select {
        // case <-ctx.Done():
        //     return
        case data := <- urlPool[url]:
            bytes,err1 := proto.Marshal(data)
            logger.Info("数据长度为:",len(bytes))
            if err1 != nil {
                logger.Info("序列化失败:",err1)
            }
            logger.Debug("推送数据")
            //bytes := []byte("ndfasojdfaidsos")
            if err = sock.Send(bytes); err != nil {
                Die("推送socket发送数据失败: %s", err.Error())
            }
            msg, err = sock.Recv();
            if err != nil {
                Die("接收响应失败: %s", err.Error())
            } else {
                logger.Debug("事件推送成功!收到响应",string(msg))
            }
            sock.Close()
        default:
        }
    }
}
//func main() {
//    url := "tcp://192.168.1.123:40011"
//    Push(url,"hahahaha")
labelFilter/ruleForLabel.go
@@ -319,7 +319,7 @@
func pushData(urls []*protomsg.PushUrl, data structure.ResultMsg) {
    for _, url := range urls {
        logger.Debug("看看推送地址:",url.Url)
        //Push("tcp://"+url.Url, data)
    }
}
main.go
@@ -63,6 +63,7 @@
    go cache.Init(initchan, *dbIp, *surveyPort, *pubPort)
    logger.Info("cache init completed!!!", <-initchan) //dbserver初始化完毕
    ruleserver.Init()
    labelFilter.Init()
    go ruleserver.TimeTicker()
    go ruleserver.StartServer()
    nReciever("ipc:///tmp/sdk-2-rules-process.ipc", deliver.PushPull, 1)