panlei
2019-11-01 e6982607fbbeaa96d3d14409df780266646b793d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package labelFilter
 
import (
    "basic.com/dbapi.git"
    "basic.com/valib/logger.git"
    "fmt"
    "github.com/golang/protobuf/proto"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/rep"
    "nanomsg.org/go-mangos/protocol/req"
    "nanomsg.org/go-mangos/transport/ipc"
    "nanomsg.org/go-mangos/transport/tcp"
    "ruleprocess/structure"
    "strings"
    "time"
)
 
 
var urlPool = make(map[string]chan structure.ResultMsg)
var pool chan *structure.ResultMsg = make(chan *structure.ResultMsg)
func Die(format string, v ...interface{}) {
    logger.Info("+++++++",format)
    //os.Exit(1)
}
 
func date() string {
    return time.Now().Format(time.ANSIC)
}
var msgChan chan []byte
func Receive(url string) {
    var sock mangos.Socket
    var err error
    var msg []byte
    msgChan = make(chan []byte,200)
    if sock, err = rep.NewSocket(); err != nil {
        Die("can't get new rep socket: %s", err)
    }
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    if err = sock.Listen(url); err != nil {
        Die("can't listen on rep socket: %s", err.Error())
    }
    for {
        // Could also use sock.RecvMsg to get header
        msg, err = sock.Recv()
        if msg != nil { // no need to terminate
            fmt.Println("Received Data request")
            // 把收到的msg塞进通道
            msgChan <- msg
            // 给发送程序反馈信息
            d := date()
            err = sock.Send([]byte("Received Data, --"+d))
            if err != nil {
                Die("can't send reply: %s", err.Error())
            }
        }
    }
}
 
func Init(){
 
    var api dbapi.EventPushApi
    b, allRules := api.FindAllDetails()
    logger.Info("初始化事件推送,查看所有规则组:", allRules)
    if !b {
        logger.Error("查询时间推送规则失败!")
    }
    for _, ruleGroup := range allRules {
        if ruleGroup.Enable { // 大规则开关开启状态
            for _, url := range ruleGroup.Urls {
                // 为每个url建立一个chan
                if strings.Contains(url.Url,"114") {
                    //urlPool[url.Url] = make(chan structure.ResultMsg,10)
                    go GoPush(url.Url)
                }
            }
        }
    }
}
 
func GoPush(url string) {
    var err error
    var msg []byte
    var sock mangos.Socket
    if sock, err = req.NewSocket(); err != nil {
        Die("创建请求socket失败: %s", err.Error())
    }
    errSize := sock.SetOption(mangos.OptionMaxRecvSize,30*1024*1024)
    if errSize != nil {
        logger.Error("传输的数据超过大小限制")
        return
    }
    errTimeOut := sock.SetOption(mangos.OptionRecvDeadline,time.Millisecond * 2000)
    if errTimeOut != nil {
        logger.Error("接收响应超时")
        return
    }
    errWrite := sock.SetOption(mangos.OptionWriteQLen,10)
    if errWrite != nil {
        logger.Error("设置传输缓存大小失败")
        return
    }
    errRead := sock.SetOption(mangos.OptionReadQLen,10)
    if errRead != nil {
        logger.Error("设置传输缓存大小失败")
        return
    }
    sock.AddTransport(tcp.NewTransport())
    if err = sock.Dial("tcp://"+url); err != nil {
        logger.Error("请求socket拨号失败: ", err.Error())
    }
    logger.Info("序列化数据")
 
    //for v := range pool{
    //    logger.Info("无限循环",v.Cid)
    //}
 
    for {
        select {
        // case <-ctx.Done():
        //     return
        case data := <- pool:
            logger.Info("接收到数据",data.Cid)
            bytes,err1 := proto.Marshal(data)
            logger.Info("数据长度为:",len(bytes))
            if err1 != nil {
                logger.Info("序列化失败:",err1)
            }
            logger.Debug("groutine"+url+"推送数据")
            //bytes := []byte("ndfasojdfaidsos")
            if err = sock.Send(bytes); err != nil {
                Die("groutine"+url+"推送socket发送数据失败: ", err.Error())
            }
            msg, err = sock.Recv();
            if err != nil {
                Die("groutine"+url+"接收响应失败: ", err.Error())
            } else {
                logger.Debug("事件推送成功!groutine"+url+"收到响应",string(msg))
            }
 
        default:
 
        }
    }
    sock.Close()
}
//func main() {
//    url := "tcp://192.168.1.123:40011"
//    Push(url,"hahahaha")
//}