liuxiaolong
2020-01-10 25361b9b3b9de3f4e24a914675a261e22ad67e58
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package pubsub
 
import (
    "context"
    "encoding/json"
    "fmt"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/pub"
    "nanomsg.org/go-mangos/protocol/sub"
    "nanomsg.org/go-mangos/transport/ipc"
    "nanomsg.org/go-mangos/transport/tcp"
)
 
type mangosPubSub struct {
    url string
 
    ctx  context.Context
 
    sock mangos.Socket
 
    pubCh chan []byte  //publish msg chan
 
    recvCh chan Message  //recv msg chan
}
 
func newPub(url string) (*mangosPubSub,error) {
    var sock mangos.Socket
    var err error
 
    sock, err = pub.NewSocket()
    if err != nil {
        return nil, err
    }
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
 
    err = sock.Listen(url)
    if err != nil {
        return nil, err
    }
    ctx, cancel := context.WithCancel(context.Background())
    pub := &mangosPubSub{
        url: url,
        ctx: ctx,
        sock: sock,
        pubCh: make(chan []byte),
    }
    go func() {
        for {
            select {
            case <-ctx.Done():
                close(pub.pubCh)
                cancel()
                return
            case msg := <-pub.pubCh:
                err := pub.sock.Send(msg)
                if err != nil {
                    fmt.Println("Error PUBLISH MSG to the socket:", err.Error())
                }
            }
        }
    }()
    return pub,nil
}
 
func newSub(url string, topics []string) (*mangosPubSub,error) {
    var sock mangos.Socket
    var err error
 
    sock, err = sub.NewSocket()
    if err != nil {
        return nil, err
    }
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
 
    err = sock.Dial(url)
    if err != nil {
        return nil, err
    }
    // subscribes to everything
    err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
    if err != nil {
        return nil, err
    }
    ctx, cancel := context.WithCancel(context.Background())
    sub := &mangosPubSub{
        url:url,
        ctx: ctx,
        sock: sock,
        recvCh: make(chan Message,50),
    }
 
    var msg []byte
    go func() {
        for {
            select {
            case <-ctx.Done():
                close(sub.recvCh)
                cancel()
                return
            default:
                msg, err = sub.sock.Recv()
                if err != nil {
                    fmt.Println("Cannot SUBSCRIBE MSG,ERR:", err.Error())
                } else {
                    //判断是否是想要的主题消息
                    var recvMsg Message
                    if unmarshlErr := json.Unmarshal(msg, &recvMsg);unmarshlErr ==nil {
                        if matchTopic(recvMsg.Topic, topics) {
                            sub.recvCh <- recvMsg
                        }
                    }
                }
            }
        }
    }()
 
    return sub,nil
}
 
func matchTopic(topic string,subTopics []string) bool {
    if subTopics ==nil && len(subTopics) ==0 {
        return true
    }
    for _,t := range subTopics {
        if topic == t {
            return true
        }
    }
    return false
}
 
func (ps *mangosPubSub) Publish(msg []byte) {
    ps.pubCh <- msg
}
 
func (ps *mangosPubSub) Recv() chan Message {
    return ps.recvCh
}