zhangmeng
2019-05-17 36766ab5b68ce7dfb39dff5d6d283ce5c7f4b346
提交 | 用户 | age
9d4b12 1 package deliver
Z 2
3 import (
4     "fmt"
5     "os"
6     "strings"
9e1301 7     "time"
9d4b12 8
Z 9     "nanomsg.org/go-mangos"
10     "nanomsg.org/go-mangos/protocol/bus"
11     "nanomsg.org/go-mangos/protocol/pair"
12     "nanomsg.org/go-mangos/protocol/pub"
13     "nanomsg.org/go-mangos/protocol/pull"
14     "nanomsg.org/go-mangos/protocol/push"
15     "nanomsg.org/go-mangos/protocol/rep"
16     "nanomsg.org/go-mangos/protocol/req"
17     "nanomsg.org/go-mangos/protocol/respondent"
18     "nanomsg.org/go-mangos/protocol/sub"
19     "nanomsg.org/go-mangos/protocol/surveyor"
20     "nanomsg.org/go-mangos/transport/all"
21 )
22
23 // NNG mangos wrap
24 type NNG struct {
36766a 25     sock   mangos.Socket
Z 26     server bool
27     mode   Mode
28     url    string
29
30     arguments []interface{}
9d4b12 31 }
Z 32
33 // Send impl interface Diliver
34 func (n *NNG) Send(data []byte) error {
36766a 35     var err error
9d4b12 36     if n.sock == nil {
36766a 37         n.sock, err = n.makeNNG(true)
Z 38         if err != nil {
39             fmt.Println("create nng producer error")
40             return err
41         }
9d4b12 42     }
Z 43
c0dcd1 44     if surveyorTime > 0 {
Z 45         time.Sleep(time.Duration(surveyorTime*2) * time.Second)
46     }
47
36766a 48     msg := mangos.NewMessage(len(data))
Z 49     msg.Body = data
50     return n.sock.SendMsg(msg)
9e1301 51
9d4b12 52 }
Z 53
54 // Recv impl interface Diliver
55 func (n *NNG) Recv() ([]byte, error) {
36766a 56     var err error
Z 57
9d4b12 58     if n.sock == nil {
36766a 59         n.sock, err = n.makeNNG(false)
Z 60         if err != nil {
61             fmt.Println("create nng consumer error")
5ff1f3 62             return nil, err
a6b23c 63         }
9d4b12 64     }
36766a 65
Z 66     var msg *mangos.Message
67     if msg, err = n.sock.RecvMsg(); err != nil {
68         return nil, err
69     }
70     return msg.Body, nil
71
9d4b12 72 }
Z 73
aaae99 74 // Close impl interface Deliver
Z 75 func (n *NNG) Close() {
76     if n.sock != nil {
77         n.sock.Close()
8e158e 78         n.sock = nil
aaae99 79     }
Z 80 }
81
6887a3 82 func nngServer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 83
Z 84     rmExistedIpcName(url)
85
36766a 86     return &NNG{
Z 87         server:    true,
88         mode:      m,
89         url:       url,
90         arguments: args,
91     }
9d4b12 92 }
Z 93
6887a3 94 func nngClient(m Mode, url string, args ...interface{}) *NNG {
9d4b12 95
36766a 96     return &NNG{
Z 97         server:    false,
98         mode:      m,
99         url:       url,
100         arguments: args,
9d4b12 101     }
Z 102
36766a 103 }
Z 104
105 func proto(producer bool, m Mode) protocol {
106     if producer {
107         return protoProducer(m)
108     }
109     return protoConsumer(m)
110 }
111
112 func (n *NNG) makeNNG(producer bool) (mangos.Socket, error) {
113
114     var sock mangos.Socket
115     var err error
116     if sock, err = newSocket(proto(producer, n.mode)); err != nil {
117         return nil, err
118     }
119
120     if err = setSocketOptions(sock, n.arguments...); err != nil {
121         sock.Close()
122         sock = nil
123     }
124     if n.server {
125         if err = sock.Listen(n.url); err != nil {
126             sock.Close()
127             sock = nil
128         }
129     } else {
130         if err = sock.Dial(n.url); err != nil {
131             sock.Close()
132             sock = nil
133         }
134     }
135     return sock, err
9d4b12 136 }
Z 137
9e1301 138 // maxRecvSize max recv size
Z 139 var (
140     maxRecvSize  = 33 * 1024 * 1024
c0dcd1 141     surveyorTime = -1
9e1301 142 )
9d4b12 143
8e158e 144 func defualtSocketOptions() map[string]interface{} {
9d4b12 145
8e158e 146     options := make(map[string]interface{})
Z 147
148     options[mangos.OptionMaxRecvSize] = maxRecvSize
149     options[mangos.OptionWriteQLen] = 0
150     options[mangos.OptionReadQLen] = 0
36766a 151     // options[mangos.OptionRecvDeadline] = time.Second
Z 152     // options[mangos.OptionSendDeadline] = time.Second
8e158e 153     options[mangos.OptionRaw] = true
Z 154
155     return options
9d4b12 156 }
Z 157
158 func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
159
8e158e 160     options := defualtSocketOptions()
Z 161
9e1301 162     switch sock.GetProtocol().Number() {
Z 163     case mangos.ProtoSub:
36766a 164         topic := ""
9d4b12 165         for _, arg := range args {
Z 166             switch arg.(type) {
167             case string:
36766a 168                 topic = arg.(string)
9d4b12 169             default:
Z 170             }
171         }
36766a 172         options[mangos.OptionSubscribe] = []byte(topic)
9e1301 173     case mangos.ProtoSurveyor:
Z 174         for _, arg := range args {
175             switch arg.(type) {
176             case int:
c0dcd1 177                 if arg.(int) < 2 {
Z 178                     surveyorTime = 1
179                 } else {
180                     surveyorTime = arg.(int) / 2
181                 }
9e1301 182             default:
Z 183             }
184         }
36766a 185         options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second
Z 186
9e1301 187     default:
9d4b12 188     }
Z 189
8e158e 190     for k, v := range options {
Z 191         if err := sock.SetOption(k, v); err != nil {
192             return err
193         }
194     }
9d4b12 195     return nil
Z 196 }
197
198 func rmExistedIpcName(url string) {
199     s := strings.Split(url, "://")
200
201     if s[0] == "ipc" {
202         if _, err := os.Stat(s[1]); err == nil {
203             os.Remove(s[1])
204         }
205     }
206 }
207
208 // newSocket allocates a new Socket.  The Socket is the handle used to
209 // access the underlying library.
210 func newSocket(p protocol) (mangos.Socket, error) {
211
212     var s mangos.Socket
213     var err error
214
215     switch p {
216     case PUB:
217         s, err = pub.NewSocket()
218     case SUB:
219         s, err = sub.NewSocket()
220     case PUSH:
221         s, err = push.NewSocket()
222     case PULL:
223         s, err = pull.NewSocket()
224     case REQ:
225         s, err = req.NewSocket()
226     case REP:
227         s, err = rep.NewSocket()
228     case SURVEYOR:
229         s, err = surveyor.NewSocket()
230     case RESPONDENT:
231         s, err = respondent.NewSocket()
232     case PAIR:
233         s, err = pair.NewSocket()
234     case BUS:
235         s, err = bus.NewSocket()
236     default:
237         err = mangos.ErrBadProto
238     }
239
240     if err != nil {
241         return nil, err
242     }
243
244     all.AddTransports(s)
245
246     return s, nil
247 }
248
249 func die(format string, v ...interface{}) {
250     fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
251     os.Exit(1)
252 }
253
254 // Protocol is the numeric abstraction to the various protocols or patterns
255 // that Mangos supports.
256 type protocol int
257
258 // Constants for protocols.
259 const (
260     PUSH       = protocol(mangos.ProtoPush)
261     PULL       = protocol(mangos.ProtoPull)
262     PUB        = protocol(mangos.ProtoPub)
263     SUB        = protocol(mangos.ProtoSub)
264     REQ        = protocol(mangos.ProtoReq)
265     REP        = protocol(mangos.ProtoRep)
266     SURVEYOR   = protocol(mangos.ProtoSurveyor)
267     RESPONDENT = protocol(mangos.ProtoRespondent)
268     BUS        = protocol(mangos.ProtoBus)
269     PAIR       = protocol(mangos.ProtoPair)
270 )
271
272 func protoProducer(m Mode) protocol {
273     switch m {
274     case PushPull:
275         return PUSH
276     case PubSub:
277         return PUB
278     case ReqRep:
279         return REP
280     case SurvResp:
281         return SURVEYOR
282     case Bus:
283         return BUS
284     case Pair:
285         return PAIR
286     }
287     return PUSH
288 }
289
290 func protoConsumer(m Mode) protocol {
291     switch m {
292     case PushPull:
293         return PULL
294     case PubSub:
295         return SUB
296     case ReqRep:
297         return REQ
298     case SurvResp:
299         return RESPONDENT
300     case Bus:
301         return BUS
302     case Pair:
303         return PAIR
304     }
305     return PULL
306 }