zhangmeng
2019-05-16 162fe98d7728445b72528283e1bfdddc432d2676
提交 | 用户 | age
9d4b12 1 package deliver
Z 2
3 import (
4     "errors"
5     "fmt"
6     "os"
7     "strings"
9e1301 8     "time"
9d4b12 9
Z 10     "nanomsg.org/go-mangos"
11     "nanomsg.org/go-mangos/protocol/bus"
12     "nanomsg.org/go-mangos/protocol/pair"
13     "nanomsg.org/go-mangos/protocol/pub"
14     "nanomsg.org/go-mangos/protocol/pull"
15     "nanomsg.org/go-mangos/protocol/push"
16     "nanomsg.org/go-mangos/protocol/rep"
17     "nanomsg.org/go-mangos/protocol/req"
18     "nanomsg.org/go-mangos/protocol/respondent"
19     "nanomsg.org/go-mangos/protocol/sub"
20     "nanomsg.org/go-mangos/protocol/surveyor"
21     "nanomsg.org/go-mangos/transport/all"
22 )
23
24 // NNG mangos wrap
25 type NNG struct {
26     sock mangos.Socket
5ff1f3 27     raw  bool
9d4b12 28 }
Z 29
30 // Send impl interface Diliver
31 func (n *NNG) Send(data []byte) error {
32     if n.sock == nil {
33         return errors.New("please init NNG first")
34     }
35
5ff1f3 36     // switch n.sock.GetProtocol().Number() {
Z 37     // case mangos.ProtoSurveyor:
38     //     time.Sleep(surveyorTime * 2)
39     // default:
40     // }
41     if n.raw {
9d4b12 42         msg := mangos.NewMessage(len(data))
Z 43         msg.Body = data
44         return n.sock.SendMsg(msg)
45     }
9e1301 46
9d4b12 47     return n.sock.Send(data)
Z 48 }
49
50 // Recv impl interface Diliver
51 func (n *NNG) Recv() ([]byte, error) {
52     if n.sock == nil {
53         return nil, errors.New("please init NNG first")
54     }
5ff1f3 55     if n.raw {
Z 56         var msg *mangos.Message
57         var err error
58         if msg, err = n.sock.RecvMsg(); err != nil {
59             return nil, err
a6b23c 60         }
5ff1f3 61         return msg.Body, nil
9d4b12 62     }
Z 63     return n.sock.Recv()
64 }
65
aaae99 66 // Close impl interface Deliver
Z 67 func (n *NNG) Close() {
68     if n.sock != nil {
69         n.sock.Close()
8e158e 70         n.sock = nil
aaae99 71     }
Z 72 }
73
8e158e 74 func nngListener(m Mode, url string, args ...interface{}) *NNG {
9d4b12 75
Z 76     rmExistedIpcName(url)
77     if sock, err := newSocket(protoProducer(m)); err == nil {
78         if err = setSocketOptions(sock, args); err != nil {
162fe9 79             sock.Close()
Z 80             sock = nil
9d4b12 81             return nil
Z 82         }
83         if err = sock.Listen(url); err != nil {
84             sock.Close()
8e158e 85             sock = nil
9d4b12 86             return nil
Z 87         }
88         return &NNG{
89             sock,
5ff1f3 90             true,
9d4b12 91         }
Z 92     }
93
94     return nil
95 }
96
8e158e 97 func nngDialer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 98
Z 99     if sock, err := newSocket(protoConsumer(m)); err == nil {
100         if err = setSocketOptions(sock, args); err != nil {
162fe9 101             sock.Close()
Z 102             sock = nil
9d4b12 103             return nil
Z 104         }
105
106         if err = sock.Dial(url); err != nil {
107             sock.Close()
8e158e 108             sock = nil
9d4b12 109             return nil
Z 110         }
111
112         return &NNG{
113             sock,
5ff1f3 114             true,
9d4b12 115         }
Z 116     }
117
118     return nil
119 }
120
9e1301 121 // maxRecvSize max recv size
Z 122 var (
123     maxRecvSize  = 33 * 1024 * 1024
124     surveyorTime = time.Second / 2
125 )
9d4b12 126
8e158e 127 func defualtSocketOptions() map[string]interface{} {
9d4b12 128
8e158e 129     options := make(map[string]interface{})
Z 130
131     options[mangos.OptionMaxRecvSize] = maxRecvSize
132     options[mangos.OptionWriteQLen] = 0
133     options[mangos.OptionReadQLen] = 0
134     options[mangos.OptionRecvDeadline] = time.Second
135     options[mangos.OptionSendDeadline] = time.Second
136     options[mangos.OptionRaw] = true
137
138     return options
9d4b12 139 }
Z 140
141 func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
142
8e158e 143     options := defualtSocketOptions()
Z 144
9e1301 145     switch sock.GetProtocol().Number() {
Z 146     case mangos.ProtoSub:
9d4b12 147         for _, arg := range args {
Z 148             switch arg.(type) {
149             case string:
8e158e 150                 options[mangos.OptionSubscribe] = []byte(arg.(string))
9d4b12 151             default:
8e158e 152                 options[mangos.OptionSubscribe] = []byte("")
9d4b12 153             }
Z 154         }
9e1301 155     case mangos.ProtoSurveyor:
Z 156         for _, arg := range args {
157             switch arg.(type) {
158             case int:
159                 surveyorTime = time.Duration(arg.(int)/2) * time.Second
160             default:
161             }
8e158e 162             options[mangos.OptionSurveyTime] = surveyorTime
9e1301 163         }
Z 164     default:
165         fmt.Println("no additional args")
9d4b12 166     }
Z 167
8e158e 168     for k, v := range options {
Z 169         if err := sock.SetOption(k, v); err != nil {
170             return err
171         }
172     }
9d4b12 173     return nil
Z 174 }
175
176 func rmExistedIpcName(url string) {
177     s := strings.Split(url, "://")
178
179     if s[0] == "ipc" {
180         if _, err := os.Stat(s[1]); err == nil {
181             os.Remove(s[1])
182         }
183     }
184 }
185
186 // newSocket allocates a new Socket.  The Socket is the handle used to
187 // access the underlying library.
188 func newSocket(p protocol) (mangos.Socket, error) {
189
190     var s mangos.Socket
191     var err error
192
193     switch p {
194     case PUB:
195         s, err = pub.NewSocket()
196     case SUB:
197         s, err = sub.NewSocket()
198     case PUSH:
199         s, err = push.NewSocket()
200     case PULL:
201         s, err = pull.NewSocket()
202     case REQ:
203         s, err = req.NewSocket()
204     case REP:
205         s, err = rep.NewSocket()
206     case SURVEYOR:
207         s, err = surveyor.NewSocket()
208     case RESPONDENT:
209         s, err = respondent.NewSocket()
210     case PAIR:
211         s, err = pair.NewSocket()
212     case BUS:
213         s, err = bus.NewSocket()
214     default:
215         err = mangos.ErrBadProto
216     }
217
218     if err != nil {
219         return nil, err
220     }
221
222     all.AddTransports(s)
223
224     return s, nil
225 }
226
227 func die(format string, v ...interface{}) {
228     fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
229     os.Exit(1)
230 }
231
232 // Protocol is the numeric abstraction to the various protocols or patterns
233 // that Mangos supports.
234 type protocol int
235
236 // Constants for protocols.
237 const (
238     PUSH       = protocol(mangos.ProtoPush)
239     PULL       = protocol(mangos.ProtoPull)
240     PUB        = protocol(mangos.ProtoPub)
241     SUB        = protocol(mangos.ProtoSub)
242     REQ        = protocol(mangos.ProtoReq)
243     REP        = protocol(mangos.ProtoRep)
244     SURVEYOR   = protocol(mangos.ProtoSurveyor)
245     RESPONDENT = protocol(mangos.ProtoRespondent)
246     BUS        = protocol(mangos.ProtoBus)
247     PAIR       = protocol(mangos.ProtoPair)
248 )
249
250 func protoProducer(m Mode) protocol {
251     switch m {
252     case PushPull:
253         return PUSH
254     case PubSub:
255         return PUB
256     case ReqRep:
257         return REP
258     case SurvResp:
259         return SURVEYOR
260     case Bus:
261         return BUS
262     case Pair:
263         return PAIR
264     }
265     return PUSH
266 }
267
268 func protoConsumer(m Mode) protocol {
269     switch m {
270     case PushPull:
271         return PULL
272     case PubSub:
273         return SUB
274     case ReqRep:
275         return REQ
276     case SurvResp:
277         return RESPONDENT
278     case Bus:
279         return BUS
280     case Pair:
281         return PAIR
282     }
283     return PULL
284 }