---
panlei
2019-10-30 a908d82e6ede63b8ab5799848cbabf96fbcf39bd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package labelFilter
 
import (
    "basic.com/dbapi.git"
    "basic.com/valib/logger.git"
    "fmt"
    "github.com/golang/protobuf/proto"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/rep"
    "nanomsg.org/go-mangos/protocol/req"
    "nanomsg.org/go-mangos/transport/ipc"
    "nanomsg.org/go-mangos/transport/tcp"
    "ruleprocess/structure"
    "time"
)
 
var sock mangos.Socket
var urlPool = make(map[string]chan structure.ResultMsg)
func Die(format string, v ...interface{}) {
    logger.Info("+++++++",format)
    //os.Exit(1)
}
 
func date() string {
    return time.Now().Format(time.ANSIC)
}
var msgChan chan []byte
func Receive(url string) {
    var sock mangos.Socket
    var err error
    var msg []byte
    msgChan = make(chan []byte,200)
    if sock, err = rep.NewSocket(); err != nil {
        Die("can't get new rep socket: %s", err)
    }
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    if err = sock.Listen(url); err != nil {
        Die("can't listen on rep socket: %s", err.Error())
    }
    for {
        // Could also use sock.RecvMsg to get header
        msg, err = sock.Recv()
        if msg != nil { // no need to terminate
            fmt.Println("Received Data request")
            // 把收到的msg塞进通道
            msgChan <- msg
            // 给发送程序反馈信息
            d := date()
            err = sock.Send([]byte("Received Data, --"+d))
            if err != nil {
                Die("can't send reply: %s", err.Error())
            }
        }
    }
}
 
func Init(){
 
    var err error
 
    if sock, err = req.NewSocket(); err != nil {
        Die("创建请求socket失败: %s", err.Error())
    }
    errSize := sock.SetOption(mangos.OptionMaxRecvSize,30*1024*1024)
    if errSize != nil {
        logger.Error("传输的数据超过大小限制")
        return
    }
    errTimeOut := sock.SetOption(mangos.OptionRecvDeadline,time.Millisecond * 2000)
    if errTimeOut != nil {
        logger.Error("接收响应超时")
        return
    }
    var api dbapi.EventPushApi
    b, allRules := api.FindAllDetails()
    logger.Info("查看所有规则组:", allRules)
    if !b {
        logger.Error("查询时间推送规则失败!")
    }
    for _, ruleGroup := range allRules {
        if ruleGroup.Enable { // 大规则开关开启状态
            for _, url := range ruleGroup.Urls {
                // 为每个url建立一个chan
                urlPool[url.Url] = make(chan structure.ResultMsg,50)
                go GoPush(url.Url)
            }
        }
    }
}
 
func GoPush(url string) {
    var err error
    var msg []byte
    sock.AddTransport(tcp.NewTransport())
    if err = sock.Dial(url); err != nil {
        logger.Error("请求socket拨号失败: ", err.Error())
    }
    logger.Info("序列化数据")
 
    for {
        select {
        // case <-ctx.Done():
        //     return
        case data := <- urlPool[url]:
 
            bytes,err1 := proto.Marshal(data)
            logger.Info("数据长度为:",len(bytes))
            if err1 != nil {
                logger.Info("序列化失败:",err1)
            }
            logger.Debug("groutine"+url+"推送数据")
            //bytes := []byte("ndfasojdfaidsos")
            if err = sock.Send(bytes); err != nil {
                Die("推送socket发送数据失败: ", err.Error())
            }
            msg, err = sock.Recv();
            if err != nil {
                Die("接收响应失败: ", err.Error())
            } else {
                logger.Debug("事件推送成功!groutine"+url+"收到响应",string(msg))
            }
            sock.Close()
        default:
 
        }
    }
}
//func main() {
//    url := "tcp://192.168.1.123:40011"
//    Push(url,"hahahaha")
//}