panlei
2019-09-12 b6bc6bf3590aedc964a9c2016805f645754c9572
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package labelFilter
 
import (
    "fmt"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/rep"
    "nanomsg.org/go-mangos/protocol/req"
    "nanomsg.org/go-mangos/transport/ipc"
    "nanomsg.org/go-mangos/transport/tcp"
    "os"
    "github.com/golang/protobuf/proto"
    "ruleprocess/logger"
    "ruleprocess/ruleserver"
    "time"
)
 
func Die(format string, v ...interface{}) {
    logger.Info("+++++++",format)
    os.Exit(1)
}
 
func date() string {
    return time.Now().Format(time.ANSIC)
}
var msgChan chan []byte
func Receive(url string) {
    var sock mangos.Socket
    var err error
    var msg []byte
    msgChan = make(chan []byte,200)
    if sock, err = rep.NewSocket(); err != nil {
        Die("can't get new rep socket: %s", err)
    }
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    if err = sock.Listen(url); err != nil {
        Die("can't listen on rep socket: %s", err.Error())
    }
    for {
        // Could also use sock.RecvMsg to get header
        msg, err = sock.Recv()
        if msg != nil { // no need to terminate
            fmt.Println("Received Data request")
            // 把收到的msg塞进通道
            msgChan <- msg
            // 给发送程序反馈信息
            d := date()
            err = sock.Send([]byte("Received Data, --"+d))
            if err != nil {
                Die("can't send reply: %s", err.Error())
            }
        }
    }
}
 
func Push(url string,data ruleserver.ResultMsg) {
    var sock mangos.Socket
    var err error
    var msg []byte
 
    if sock, err = req.NewSocket(); err != nil {
        Die("创建请求socket失败: %s", err.Error())
    }
    errSize := sock.SetOption(mangos.OptionMaxRecvSize,5*1024*1024)
    if errSize != nil {
        fmt.Errorf("Failed set MaxRecvSize: %v", err)
        return
    }
    //sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    if err = sock.Dial(url); err != nil {
        Die("请求socket拨号失败: %s", err.Error())
    }
    logger.Info("序列化数据")
    bytes,err1 := proto.Marshal(data)
    logger.Info("数据长度为:",len(bytes))
    if err1 != nil {
        logger.Info("序列化失败:",err1)
    }
    logger.Debug("推送数据")
    //bytes := []byte("ndfasojdfaidsos")
    if err = sock.Send(bytes); err != nil {
        Die("推送socket发送数据失败: %s", err.Error())
    }
    if msg, err = sock.Recv(); err != nil {
        Die("接收响应失败: %s", err.Error())
    }
    logger.Debug("数据推送成功!收到响应",string(msg))
    sock.Close()
}
 
//func main() {
//    url := "tcp://192.168.1.123:40011"
//    Push(url,"hahahaha")
//}