fix
zhangmeng
2019-05-16 5ff1f32410e0697e581f6c389c8c22c5abd474c0
提交 | 用户 | age
9d4b12 1 package deliver
Z 2
3 import (
4     "errors"
5     "fmt"
6     "os"
7     "strings"
9e1301 8     "time"
9d4b12 9
Z 10     "nanomsg.org/go-mangos"
11     "nanomsg.org/go-mangos/protocol/bus"
12     "nanomsg.org/go-mangos/protocol/pair"
13     "nanomsg.org/go-mangos/protocol/pub"
14     "nanomsg.org/go-mangos/protocol/pull"
15     "nanomsg.org/go-mangos/protocol/push"
16     "nanomsg.org/go-mangos/protocol/rep"
17     "nanomsg.org/go-mangos/protocol/req"
18     "nanomsg.org/go-mangos/protocol/respondent"
19     "nanomsg.org/go-mangos/protocol/sub"
20     "nanomsg.org/go-mangos/protocol/surveyor"
21     "nanomsg.org/go-mangos/transport/all"
22 )
23
24 // NNG mangos wrap
25 type NNG struct {
26     sock mangos.Socket
5ff1f3 27     raw  bool
9d4b12 28 }
Z 29
30 // Send impl interface Diliver
31 func (n *NNG) Send(data []byte) error {
32     if n.sock == nil {
33         return errors.New("please init NNG first")
34     }
35
5ff1f3 36     // switch n.sock.GetProtocol().Number() {
Z 37     // case mangos.ProtoSurveyor:
38     //     time.Sleep(surveyorTime * 2)
39     // default:
40     // }
41     if n.raw {
9d4b12 42         msg := mangos.NewMessage(len(data))
Z 43         msg.Body = data
44         return n.sock.SendMsg(msg)
45     }
9e1301 46
9d4b12 47     return n.sock.Send(data)
Z 48 }
49
50 // Recv impl interface Diliver
51 func (n *NNG) Recv() ([]byte, error) {
52     if n.sock == nil {
53         return nil, errors.New("please init NNG first")
54     }
5ff1f3 55     if n.raw {
Z 56         var msg *mangos.Message
57         var err error
58         if msg, err = n.sock.RecvMsg(); err != nil {
59             return nil, err
a6b23c 60         }
5ff1f3 61         return msg.Body, nil
9d4b12 62     }
Z 63     return n.sock.Recv()
64 }
65
aaae99 66 // Close impl interface Deliver
Z 67 func (n *NNG) Close() {
68     if n.sock != nil {
69         n.sock.Close()
70     }
71 }
72
f5368c 73 // nngProducer create from deliver Mode
Z 74 func nngProducer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 75
Z 76     rmExistedIpcName(url)
77     if sock, err := newSocket(protoProducer(m)); err == nil {
78         if err = setSocketOptions(sock, args); err != nil {
79             return nil
80         }
81         if err = sock.Listen(url); err != nil {
82             sock.Close()
83             return nil
84         }
85         return &NNG{
86             sock,
5ff1f3 87             true,
9d4b12 88         }
Z 89     }
90
91     return nil
92 }
93
f5368c 94 // nngConsumer create from deliver Mode
Z 95 func nngConsumer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 96
Z 97     if sock, err := newSocket(protoConsumer(m)); err == nil {
98         if err = setSocketOptions(sock, args); err != nil {
99             return nil
100         }
101
102         if err = sock.Dial(url); err != nil {
103             sock.Close()
104             return nil
105         }
106
107         return &NNG{
108             sock,
5ff1f3 109             true,
9d4b12 110         }
Z 111     }
112
113     return nil
114 }
115
9e1301 116 // maxRecvSize max recv size
Z 117 var (
118     maxRecvSize  = 33 * 1024 * 1024
119     surveyorTime = time.Second / 2
120 )
9d4b12 121
Z 122 func defualtSocketOptions(sock mangos.Socket) error {
123     var err error
9e1301 124     if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil {
9d4b12 125         sock.Close()
Z 126         return err
127     }
128     if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil {
129         sock.Close()
130         return err
131     }
132     if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil {
133         sock.Close()
134         return err
135     }
a6b23c 136     if err = sock.SetOption(mangos.OptionRecvDeadline, time.Second); err != nil {
Z 137         sock.Close()
138         return err
139     }
140     if err = sock.SetOption(mangos.OptionSendDeadline, time.Second); err != nil {
141         sock.Close()
142         return err
143     }
9d4b12 144     if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
Z 145         sock.Close()
146         return err
147     }
148
149     return nil
150 }
151
152 func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
153
154     err := defualtSocketOptions(sock)
155     if err != nil {
156         return err
157     }
9e1301 158     switch sock.GetProtocol().Number() {
Z 159     case mangos.ProtoSub:
9d4b12 160         for _, arg := range args {
Z 161             switch arg.(type) {
162             case string:
163                 err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string)))
164             default:
165                 err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
166             }
167         }
9e1301 168     case mangos.ProtoSurveyor:
Z 169         for _, arg := range args {
170             switch arg.(type) {
171             case int:
172                 surveyorTime = time.Duration(arg.(int)/2) * time.Second
173             default:
174             }
175             err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime)
176         }
177     default:
178         fmt.Println("no additional args")
9d4b12 179     }
Z 180
181     return nil
182 }
183
184 func rmExistedIpcName(url string) {
185     s := strings.Split(url, "://")
186
187     if s[0] == "ipc" {
188         if _, err := os.Stat(s[1]); err == nil {
189             os.Remove(s[1])
190         }
191     }
192 }
193
194 // newSocket allocates a new Socket.  The Socket is the handle used to
195 // access the underlying library.
196 func newSocket(p protocol) (mangos.Socket, error) {
197
198     var s mangos.Socket
199     var err error
200
201     switch p {
202     case PUB:
203         s, err = pub.NewSocket()
204     case SUB:
205         s, err = sub.NewSocket()
206     case PUSH:
207         s, err = push.NewSocket()
208     case PULL:
209         s, err = pull.NewSocket()
210     case REQ:
211         s, err = req.NewSocket()
212     case REP:
213         s, err = rep.NewSocket()
214     case SURVEYOR:
215         s, err = surveyor.NewSocket()
216     case RESPONDENT:
217         s, err = respondent.NewSocket()
218     case PAIR:
219         s, err = pair.NewSocket()
220     case BUS:
221         s, err = bus.NewSocket()
222     default:
223         err = mangos.ErrBadProto
224     }
225
226     if err != nil {
227         return nil, err
228     }
229
230     all.AddTransports(s)
231     // s.AddTransport(ipc.NewTransport())
232     // s.AddTransport(tcp.NewTransport())
233
234     return s, nil
235 }
236
237 func die(format string, v ...interface{}) {
238     fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
239     os.Exit(1)
240 }
241
242 // Protocol is the numeric abstraction to the various protocols or patterns
243 // that Mangos supports.
244 type protocol int
245
246 // Constants for protocols.
247 const (
248     PUSH       = protocol(mangos.ProtoPush)
249     PULL       = protocol(mangos.ProtoPull)
250     PUB        = protocol(mangos.ProtoPub)
251     SUB        = protocol(mangos.ProtoSub)
252     REQ        = protocol(mangos.ProtoReq)
253     REP        = protocol(mangos.ProtoRep)
254     SURVEYOR   = protocol(mangos.ProtoSurveyor)
255     RESPONDENT = protocol(mangos.ProtoRespondent)
256     BUS        = protocol(mangos.ProtoBus)
257     PAIR       = protocol(mangos.ProtoPair)
258 )
259
260 func protoProducer(m Mode) protocol {
261     switch m {
262     case PushPull:
263         return PUSH
264     case PubSub:
265         return PUB
266     case ReqRep:
267         return REP
268     case SurvResp:
269         return SURVEYOR
270     case Bus:
271         return BUS
272     case Pair:
273         return PAIR
274     }
275     return PUSH
276 }
277
278 func protoConsumer(m Mode) protocol {
279     switch m {
280     case PushPull:
281         return PULL
282     case PubSub:
283         return SUB
284     case ReqRep:
285         return REQ
286     case SurvResp:
287         return RESPONDENT
288     case Bus:
289         return BUS
290     case Pair:
291         return PAIR
292     }
293     return PULL
294 }