zhangmeng
2019-05-16 8e158e611ca6a3663e383d5a9b14d14eaf897736
提交 | 用户 | age
9d4b12 1 package deliver
Z 2
3 import (
4     "errors"
5     "fmt"
6     "os"
7     "strings"
9e1301 8     "time"
9d4b12 9
Z 10     "nanomsg.org/go-mangos"
11     "nanomsg.org/go-mangos/protocol/bus"
12     "nanomsg.org/go-mangos/protocol/pair"
13     "nanomsg.org/go-mangos/protocol/pub"
14     "nanomsg.org/go-mangos/protocol/pull"
15     "nanomsg.org/go-mangos/protocol/push"
16     "nanomsg.org/go-mangos/protocol/rep"
17     "nanomsg.org/go-mangos/protocol/req"
18     "nanomsg.org/go-mangos/protocol/respondent"
19     "nanomsg.org/go-mangos/protocol/sub"
20     "nanomsg.org/go-mangos/protocol/surveyor"
21     "nanomsg.org/go-mangos/transport/all"
22 )
23
24 // NNG mangos wrap
25 type NNG struct {
26     sock mangos.Socket
5ff1f3 27     raw  bool
9d4b12 28 }
Z 29
30 // Send impl interface Diliver
31 func (n *NNG) Send(data []byte) error {
32     if n.sock == nil {
33         return errors.New("please init NNG first")
34     }
35
5ff1f3 36     // switch n.sock.GetProtocol().Number() {
Z 37     // case mangos.ProtoSurveyor:
38     //     time.Sleep(surveyorTime * 2)
39     // default:
40     // }
41     if n.raw {
9d4b12 42         msg := mangos.NewMessage(len(data))
Z 43         msg.Body = data
44         return n.sock.SendMsg(msg)
45     }
9e1301 46
9d4b12 47     return n.sock.Send(data)
Z 48 }
49
50 // Recv impl interface Diliver
51 func (n *NNG) Recv() ([]byte, error) {
52     if n.sock == nil {
53         return nil, errors.New("please init NNG first")
54     }
5ff1f3 55     if n.raw {
Z 56         var msg *mangos.Message
57         var err error
58         if msg, err = n.sock.RecvMsg(); err != nil {
59             return nil, err
a6b23c 60         }
5ff1f3 61         return msg.Body, nil
9d4b12 62     }
Z 63     return n.sock.Recv()
64 }
65
aaae99 66 // Close impl interface Deliver
Z 67 func (n *NNG) Close() {
68     if n.sock != nil {
69         n.sock.Close()
8e158e 70         n.sock = nil
aaae99 71     }
Z 72 }
73
8e158e 74 func nngListener(m Mode, url string, args ...interface{}) *NNG {
9d4b12 75
Z 76     rmExistedIpcName(url)
77     if sock, err := newSocket(protoProducer(m)); err == nil {
78         if err = setSocketOptions(sock, args); err != nil {
79             return nil
80         }
81         if err = sock.Listen(url); err != nil {
82             sock.Close()
8e158e 83             sock = nil
9d4b12 84             return nil
Z 85         }
86         return &NNG{
87             sock,
5ff1f3 88             true,
9d4b12 89         }
Z 90     }
91
92     return nil
93 }
94
8e158e 95 func nngDialer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 96
Z 97     if sock, err := newSocket(protoConsumer(m)); err == nil {
98         if err = setSocketOptions(sock, args); err != nil {
99             return nil
100         }
101
102         if err = sock.Dial(url); err != nil {
103             sock.Close()
8e158e 104             sock = nil
9d4b12 105             return nil
Z 106         }
107
108         return &NNG{
109             sock,
5ff1f3 110             true,
9d4b12 111         }
Z 112     }
113
114     return nil
115 }
116
9e1301 117 // maxRecvSize max recv size
Z 118 var (
119     maxRecvSize  = 33 * 1024 * 1024
120     surveyorTime = time.Second / 2
121 )
9d4b12 122
8e158e 123 func defualtSocketOptions() map[string]interface{} {
9d4b12 124
8e158e 125     options := make(map[string]interface{})
Z 126
127     options[mangos.OptionMaxRecvSize] = maxRecvSize
128     options[mangos.OptionWriteQLen] = 0
129     options[mangos.OptionReadQLen] = 0
130     options[mangos.OptionRecvDeadline] = time.Second
131     options[mangos.OptionSendDeadline] = time.Second
132     options[mangos.OptionRaw] = true
133
134     return options
9d4b12 135 }
Z 136
137 func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
138
8e158e 139     options := defualtSocketOptions()
Z 140
9e1301 141     switch sock.GetProtocol().Number() {
Z 142     case mangos.ProtoSub:
9d4b12 143         for _, arg := range args {
Z 144             switch arg.(type) {
145             case string:
8e158e 146                 options[mangos.OptionSubscribe] = []byte(arg.(string))
9d4b12 147             default:
8e158e 148                 options[mangos.OptionSubscribe] = []byte("")
9d4b12 149             }
Z 150         }
9e1301 151     case mangos.ProtoSurveyor:
Z 152         for _, arg := range args {
153             switch arg.(type) {
154             case int:
155                 surveyorTime = time.Duration(arg.(int)/2) * time.Second
156             default:
157             }
8e158e 158             options[mangos.OptionSurveyTime] = surveyorTime
9e1301 159         }
Z 160     default:
161         fmt.Println("no additional args")
9d4b12 162     }
Z 163
8e158e 164     for k, v := range options {
Z 165         if err := sock.SetOption(k, v); err != nil {
166             return err
167         }
168     }
9d4b12 169     return nil
Z 170 }
171
172 func rmExistedIpcName(url string) {
173     s := strings.Split(url, "://")
174
175     if s[0] == "ipc" {
176         if _, err := os.Stat(s[1]); err == nil {
177             os.Remove(s[1])
178         }
179     }
180 }
181
182 // newSocket allocates a new Socket.  The Socket is the handle used to
183 // access the underlying library.
184 func newSocket(p protocol) (mangos.Socket, error) {
185
186     var s mangos.Socket
187     var err error
188
189     switch p {
190     case PUB:
191         s, err = pub.NewSocket()
192     case SUB:
193         s, err = sub.NewSocket()
194     case PUSH:
195         s, err = push.NewSocket()
196     case PULL:
197         s, err = pull.NewSocket()
198     case REQ:
199         s, err = req.NewSocket()
200     case REP:
201         s, err = rep.NewSocket()
202     case SURVEYOR:
203         s, err = surveyor.NewSocket()
204     case RESPONDENT:
205         s, err = respondent.NewSocket()
206     case PAIR:
207         s, err = pair.NewSocket()
208     case BUS:
209         s, err = bus.NewSocket()
210     default:
211         err = mangos.ErrBadProto
212     }
213
214     if err != nil {
215         return nil, err
216     }
217
218     all.AddTransports(s)
219
220     return s, nil
221 }
222
223 func die(format string, v ...interface{}) {
224     fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
225     os.Exit(1)
226 }
227
228 // Protocol is the numeric abstraction to the various protocols or patterns
229 // that Mangos supports.
230 type protocol int
231
232 // Constants for protocols.
233 const (
234     PUSH       = protocol(mangos.ProtoPush)
235     PULL       = protocol(mangos.ProtoPull)
236     PUB        = protocol(mangos.ProtoPub)
237     SUB        = protocol(mangos.ProtoSub)
238     REQ        = protocol(mangos.ProtoReq)
239     REP        = protocol(mangos.ProtoRep)
240     SURVEYOR   = protocol(mangos.ProtoSurveyor)
241     RESPONDENT = protocol(mangos.ProtoRespondent)
242     BUS        = protocol(mangos.ProtoBus)
243     PAIR       = protocol(mangos.ProtoPair)
244 )
245
246 func protoProducer(m Mode) protocol {
247     switch m {
248     case PushPull:
249         return PUSH
250     case PubSub:
251         return PUB
252     case ReqRep:
253         return REP
254     case SurvResp:
255         return SURVEYOR
256     case Bus:
257         return BUS
258     case Pair:
259         return PAIR
260     }
261     return PUSH
262 }
263
264 func protoConsumer(m Mode) protocol {
265     switch m {
266     case PushPull:
267         return PULL
268     case PubSub:
269         return SUB
270     case ReqRep:
271         return REQ
272     case SurvResp:
273         return RESPONDENT
274     case Bus:
275         return BUS
276     case Pair:
277         return PAIR
278     }
279     return PULL
280 }