panlei
2019-11-01 e6982607fbbeaa96d3d14409df780266646b793d
换一个chan
2个文件已修改
68 ■■■■ 已修改文件
labelFilter/req.go 64 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
labelFilter/ruleForLabel.go 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
labelFilter/req.go
@@ -4,6 +4,7 @@
    "basic.com/dbapi.git"
    "basic.com/valib/logger.git"
    "fmt"
    "github.com/golang/protobuf/proto"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/rep"
    "nanomsg.org/go-mangos/protocol/req"
@@ -16,6 +17,7 @@
var urlPool = make(map[string]chan structure.ResultMsg)
var pool chan *structure.ResultMsg = make(chan *structure.ResultMsg)
func Die(format string, v ...interface{}) {
    logger.Info("+++++++",format)
    //os.Exit(1)
@@ -68,7 +70,7 @@
            for _, url := range ruleGroup.Urls {
                // 为每个url建立一个chan
                if strings.Contains(url.Url,"114") {
                    urlPool[url.Url] = make(chan structure.ResultMsg,10)
                    //urlPool[url.Url] = make(chan structure.ResultMsg,10)
                    go GoPush(url.Url)
                }
            }
@@ -78,7 +80,7 @@
func GoPush(url string) {
    var err error
    //var msg []byte
    var msg []byte
    var sock mangos.Socket
    if sock, err = req.NewSocket(); err != nil {
        Die("创建请求socket失败: %s", err.Error())
@@ -109,37 +111,37 @@
    }
    logger.Info("序列化数据")
    for v := range urlPool[url]{
        logger.Info("无限循环",v.Cid)
    //for v := range pool{
    //    logger.Info("无限循环",v.Cid)
    //}
    for {
        select {
        // case <-ctx.Done():
        //     return
        case data := <- pool:
            logger.Info("接收到数据",data.Cid)
            bytes,err1 := proto.Marshal(data)
            logger.Info("数据长度为:",len(bytes))
            if err1 != nil {
                logger.Info("序列化失败:",err1)
            }
            logger.Debug("groutine"+url+"推送数据")
            //bytes := []byte("ndfasojdfaidsos")
            if err = sock.Send(bytes); err != nil {
                Die("groutine"+url+"推送socket发送数据失败: ", err.Error())
            }
            msg, err = sock.Recv();
            if err != nil {
                Die("groutine"+url+"接收响应失败: ", err.Error())
            } else {
                logger.Debug("事件推送成功!groutine"+url+"收到响应",string(msg))
    }
    //for {
    //    select {
    //    // case <-ctx.Done():
    //    //     return
    //    case data := <- urlPool[url]:
    //        logger.Info("接收到数据",data.Cid)
    //        //bytes,err1 := proto.Marshal(data)
    //        //logger.Info("数据长度为:",len(bytes))
    //        //if err1 != nil {
    //        //    logger.Info("序列化失败:",err1)
    //        //}
    //        //logger.Debug("groutine"+url+"推送数据")
    //        ////bytes := []byte("ndfasojdfaidsos")
    //        //if err = sock.Send(bytes); err != nil {
    //        //    Die("groutine"+url+"推送socket发送数据失败: ", err.Error())
    //        //}
    //        //msg, err = sock.Recv();
    //        //if err != nil {
    //        //    Die("groutine"+url+"接收响应失败: ", err.Error())
    //        //} else {
    //        //    logger.Debug("事件推送成功!groutine"+url+"收到响应",string(msg))
    //        //}
    //
    //    default:
    //
    //    }
    //}
        default:
        }
    }
    sock.Close()
}
//func main() {
labelFilter/ruleForLabel.go
@@ -320,8 +320,8 @@
func pushData(urls []*protomsg.PushUrl, data structure.ResultMsg) {
    for _, url := range urls {
        logger.Debug("看看推送地址:",url.Url)
        urlPool[url.Url] <- data
        logger.Info("urlPool大小: ", len(urlPool[url.Url]))
        pool <- &data
        //logger.Info("urlPool大小: ", len(urlPool[url.Url]))
    }
}