zhangmeng
2019-05-17 306588f52747268250997a9255ef19583bbd615c
提交 | 用户 | age
9d4b12 1 package deliver
Z 2
3 import (
4     "fmt"
5     "os"
6     "strings"
9e1301 7     "time"
9d4b12 8
Z 9     "nanomsg.org/go-mangos"
10     "nanomsg.org/go-mangos/protocol/bus"
11     "nanomsg.org/go-mangos/protocol/pair"
12     "nanomsg.org/go-mangos/protocol/pub"
13     "nanomsg.org/go-mangos/protocol/pull"
14     "nanomsg.org/go-mangos/protocol/push"
15     "nanomsg.org/go-mangos/protocol/rep"
16     "nanomsg.org/go-mangos/protocol/req"
17     "nanomsg.org/go-mangos/protocol/respondent"
18     "nanomsg.org/go-mangos/protocol/sub"
19     "nanomsg.org/go-mangos/protocol/surveyor"
20     "nanomsg.org/go-mangos/transport/all"
21 )
22
9893ed 23 // type deliver
Z 24 type td int
25
26 const (
27     producer = td(iota)
28     consumer
29     star //mangos bus protocol
30 )
31
9d4b12 32 // NNG mangos wrap
Z 33 type NNG struct {
36766a 34     sock   mangos.Socket
Z 35     server bool
36     mode   Mode
37     url    string
38
39     arguments []interface{}
9d4b12 40 }
Z 41
42 // Send impl interface Diliver
43 func (n *NNG) Send(data []byte) error {
36766a 44     var err error
9d4b12 45     if n.sock == nil {
9893ed 46         n.sock, err = n.makeNNG(producer)
36766a 47         if err != nil {
Z 48             fmt.Println("create nng producer error")
49             return err
50         }
9d4b12 51     }
Z 52
c0dcd1 53     if surveyorTime > 0 {
Z 54         time.Sleep(time.Duration(surveyorTime*2) * time.Second)
55     }
56
36766a 57     msg := mangos.NewMessage(len(data))
Z 58     msg.Body = data
59     return n.sock.SendMsg(msg)
9e1301 60
9d4b12 61 }
Z 62
63 // Recv impl interface Diliver
64 func (n *NNG) Recv() ([]byte, error) {
36766a 65     var err error
Z 66
9d4b12 67     if n.sock == nil {
9893ed 68         n.sock, err = n.makeNNG(consumer)
36766a 69         if err != nil {
Z 70             fmt.Println("create nng consumer error")
5ff1f3 71             return nil, err
a6b23c 72         }
9d4b12 73     }
36766a 74
Z 75     var msg *mangos.Message
76     if msg, err = n.sock.RecvMsg(); err != nil {
77         return nil, err
78     }
79     return msg.Body, nil
80
9d4b12 81 }
Z 82
aaae99 83 // Close impl interface Deliver
Z 84 func (n *NNG) Close() {
85     if n.sock != nil {
86         n.sock.Close()
8e158e 87         n.sock = nil
aaae99 88     }
Z 89 }
90
6887a3 91 func nngServer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 92
Z 93     rmExistedIpcName(url)
94
36766a 95     return &NNG{
Z 96         server:    true,
97         mode:      m,
98         url:       url,
99         arguments: args,
100     }
9d4b12 101 }
Z 102
6887a3 103 func nngClient(m Mode, url string, args ...interface{}) *NNG {
9d4b12 104
36766a 105     return &NNG{
Z 106         server:    false,
107         mode:      m,
108         url:       url,
109         arguments: args,
9d4b12 110     }
Z 111
36766a 112 }
Z 113
9893ed 114 func proto(typ td, m Mode) protocol {
Z 115     if typ == producer {
36766a 116         return protoProducer(m)
9893ed 117     } else if typ == consumer {
Z 118         return protoConsumer(m)
36766a 119     }
Z 120     return protoConsumer(m)
121 }
122
9893ed 123 func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
36766a 124
Z 125     var sock mangos.Socket
126     var err error
9893ed 127     if sock, err = newSocket(proto(typ, n.mode)); err != nil {
36766a 128         return nil, err
Z 129     }
130
131     if err = setSocketOptions(sock, n.arguments...); err != nil {
132         sock.Close()
133         sock = nil
134     }
135     if n.server {
136         if err = sock.Listen(n.url); err != nil {
137             sock.Close()
138             sock = nil
139         }
140     } else {
141         if err = sock.Dial(n.url); err != nil {
142             sock.Close()
143             sock = nil
144         }
145     }
146     return sock, err
9d4b12 147 }
Z 148
9e1301 149 // maxRecvSize max recv size
Z 150 var (
151     maxRecvSize  = 33 * 1024 * 1024
c0dcd1 152     surveyorTime = -1
9e1301 153 )
9d4b12 154
8e158e 155 func defualtSocketOptions() map[string]interface{} {
9d4b12 156
8e158e 157     options := make(map[string]interface{})
Z 158
159     options[mangos.OptionMaxRecvSize] = maxRecvSize
160     options[mangos.OptionWriteQLen] = 0
161     options[mangos.OptionReadQLen] = 0
306588 162     options[mangos.OptionRecvDeadline] = time.Second
Z 163     options[mangos.OptionSendDeadline] = time.Second
8e158e 164     options[mangos.OptionRaw] = true
Z 165
166     return options
9d4b12 167 }
Z 168
169 func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
170
8e158e 171     options := defualtSocketOptions()
Z 172
9e1301 173     switch sock.GetProtocol().Number() {
Z 174     case mangos.ProtoSub:
36766a 175         topic := ""
9d4b12 176         for _, arg := range args {
Z 177             switch arg.(type) {
178             case string:
36766a 179                 topic = arg.(string)
9d4b12 180             default:
Z 181             }
182         }
36766a 183         options[mangos.OptionSubscribe] = []byte(topic)
9e1301 184     case mangos.ProtoSurveyor:
Z 185         for _, arg := range args {
186             switch arg.(type) {
187             case int:
c0dcd1 188                 if arg.(int) < 2 {
Z 189                     surveyorTime = 1
190                 } else {
191                     surveyorTime = arg.(int) / 2
192                 }
9e1301 193             default:
Z 194             }
195         }
36766a 196         options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second
Z 197
9e1301 198     default:
9d4b12 199     }
Z 200
8e158e 201     for k, v := range options {
Z 202         if err := sock.SetOption(k, v); err != nil {
203             return err
204         }
205     }
9d4b12 206     return nil
Z 207 }
208
209 func rmExistedIpcName(url string) {
210     s := strings.Split(url, "://")
211
212     if s[0] == "ipc" {
213         if _, err := os.Stat(s[1]); err == nil {
214             os.Remove(s[1])
215         }
216     }
217 }
218
219 // newSocket allocates a new Socket.  The Socket is the handle used to
220 // access the underlying library.
221 func newSocket(p protocol) (mangos.Socket, error) {
222
223     var s mangos.Socket
224     var err error
225
226     switch p {
227     case PUB:
228         s, err = pub.NewSocket()
229     case SUB:
230         s, err = sub.NewSocket()
231     case PUSH:
232         s, err = push.NewSocket()
233     case PULL:
234         s, err = pull.NewSocket()
235     case REQ:
236         s, err = req.NewSocket()
237     case REP:
238         s, err = rep.NewSocket()
239     case SURVEYOR:
240         s, err = surveyor.NewSocket()
241     case RESPONDENT:
242         s, err = respondent.NewSocket()
243     case PAIR:
244         s, err = pair.NewSocket()
245     case BUS:
246         s, err = bus.NewSocket()
247     default:
248         err = mangos.ErrBadProto
249     }
250
251     if err != nil {
252         return nil, err
253     }
254
255     all.AddTransports(s)
256
257     return s, nil
258 }
259
260 func die(format string, v ...interface{}) {
261     fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
262     os.Exit(1)
263 }
264
265 // Protocol is the numeric abstraction to the various protocols or patterns
266 // that Mangos supports.
267 type protocol int
268
269 // Constants for protocols.
270 const (
271     PUSH       = protocol(mangos.ProtoPush)
272     PULL       = protocol(mangos.ProtoPull)
273     PUB        = protocol(mangos.ProtoPub)
274     SUB        = protocol(mangos.ProtoSub)
275     REQ        = protocol(mangos.ProtoReq)
276     REP        = protocol(mangos.ProtoRep)
277     SURVEYOR   = protocol(mangos.ProtoSurveyor)
278     RESPONDENT = protocol(mangos.ProtoRespondent)
279     BUS        = protocol(mangos.ProtoBus)
280     PAIR       = protocol(mangos.ProtoPair)
281 )
282
283 func protoProducer(m Mode) protocol {
284     switch m {
285     case PushPull:
286         return PUSH
287     case PubSub:
288         return PUB
289     case ReqRep:
290         return REP
291     case SurvResp:
292         return SURVEYOR
293     case Bus:
294         return BUS
295     case Pair:
296         return PAIR
297     }
298     return PUSH
299 }
300
301 func protoConsumer(m Mode) protocol {
302     switch m {
303     case PushPull:
304         return PULL
305     case PubSub:
306         return SUB
307     case ReqRep:
308         return REQ
309     case SurvResp:
310         return RESPONDENT
311     case Bus:
312         return BUS
313     case Pair:
314         return PAIR
315     }
316     return PULL
317 }