zhangmeng
2019-05-16 a6b23c5eaf5a8b9265e13d3db370d45cfcd5bc21
提交 | 用户 | age
9d4b12 1 package deliver
Z 2
3 import (
4     "errors"
5     "fmt"
6     "os"
7     "strings"
9e1301 8     "time"
9d4b12 9
Z 10     "nanomsg.org/go-mangos"
11     "nanomsg.org/go-mangos/protocol/bus"
12     "nanomsg.org/go-mangos/protocol/pair"
13     "nanomsg.org/go-mangos/protocol/pub"
14     "nanomsg.org/go-mangos/protocol/pull"
15     "nanomsg.org/go-mangos/protocol/push"
16     "nanomsg.org/go-mangos/protocol/rep"
17     "nanomsg.org/go-mangos/protocol/req"
18     "nanomsg.org/go-mangos/protocol/respondent"
19     "nanomsg.org/go-mangos/protocol/sub"
20     "nanomsg.org/go-mangos/protocol/surveyor"
21     "nanomsg.org/go-mangos/transport/all"
22 )
23
24 // NNG mangos wrap
25 type NNG struct {
26     sock mangos.Socket
27 }
28
29 // Send impl interface Diliver
30 func (n *NNG) Send(data []byte) error {
31     if n.sock == nil {
32         return errors.New("please init NNG first")
33     }
34
9e1301 35     switch n.sock.GetProtocol().Number() {
Z 36     case mangos.ProtoSurveyor:
37         time.Sleep(surveyorTime * 2)
38     default:
39     }
9d4b12 40     if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
Z 41         msg := mangos.NewMessage(len(data))
42         msg.Body = data
43         return n.sock.SendMsg(msg)
44     }
9e1301 45
9d4b12 46     return n.sock.Send(data)
Z 47 }
48
49 // Recv impl interface Diliver
50 func (n *NNG) Recv() ([]byte, error) {
51     if n.sock == nil {
52         return nil, errors.New("please init NNG first")
53     }
54     if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
a6b23c 55         if msg, err := n.sock.RecvMsg(); err == nil {
Z 56             return msg.Body, err
57         }
58         return nil, err
9d4b12 59     }
Z 60     return n.sock.Recv()
61 }
62
aaae99 63 // Close impl interface Deliver
Z 64 func (n *NNG) Close() {
65     if n.sock != nil {
66         n.sock.Close()
67     }
68 }
69
f5368c 70 // nngProducer create from deliver Mode
Z 71 func nngProducer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 72
Z 73     rmExistedIpcName(url)
74     if sock, err := newSocket(protoProducer(m)); err == nil {
75         if err = setSocketOptions(sock, args); err != nil {
76             return nil
77         }
78         if err = sock.Listen(url); err != nil {
79             sock.Close()
80             return nil
81         }
82         return &NNG{
83             sock,
84         }
85     }
86
87     return nil
88 }
89
f5368c 90 // nngConsumer create from deliver Mode
Z 91 func nngConsumer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 92
Z 93     if sock, err := newSocket(protoConsumer(m)); err == nil {
94         if err = setSocketOptions(sock, args); err != nil {
95             return nil
96         }
97
98         if err = sock.Dial(url); err != nil {
99             sock.Close()
100             return nil
101         }
102
103         return &NNG{
104             sock,
105         }
106     }
107
108     return nil
109 }
110
9e1301 111 // maxRecvSize max recv size
Z 112 var (
113     maxRecvSize  = 33 * 1024 * 1024
114     surveyorTime = time.Second / 2
115 )
9d4b12 116
Z 117 func defualtSocketOptions(sock mangos.Socket) error {
118     var err error
9e1301 119     if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil {
9d4b12 120         sock.Close()
Z 121         return err
122     }
123     if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil {
124         sock.Close()
125         return err
126     }
127     if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil {
128         sock.Close()
129         return err
130     }
a6b23c 131     if err = sock.SetOption(mangos.OptionRecvDeadline, time.Second); err != nil {
Z 132         sock.Close()
133         return err
134     }
135     if err = sock.SetOption(mangos.OptionSendDeadline, time.Second); err != nil {
136         sock.Close()
137         return err
138     }
9d4b12 139     if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
Z 140         sock.Close()
141         return err
142     }
143
144     return nil
145 }
146
147 func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
148
149     err := defualtSocketOptions(sock)
150     if err != nil {
151         return err
152     }
9e1301 153     switch sock.GetProtocol().Number() {
Z 154     case mangos.ProtoSub:
9d4b12 155         for _, arg := range args {
Z 156             switch arg.(type) {
157             case string:
158                 err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string)))
159             default:
160                 err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
161             }
162         }
9e1301 163     case mangos.ProtoSurveyor:
Z 164         for _, arg := range args {
165             switch arg.(type) {
166             case int:
167                 surveyorTime = time.Duration(arg.(int)/2) * time.Second
168             default:
169             }
170             err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime)
171         }
172     default:
173         fmt.Println("no additional args")
9d4b12 174     }
Z 175
176     return nil
177 }
178
179 func rmExistedIpcName(url string) {
180     s := strings.Split(url, "://")
181
182     if s[0] == "ipc" {
183         if _, err := os.Stat(s[1]); err == nil {
184             os.Remove(s[1])
185         }
186     }
187 }
188
189 // newSocket allocates a new Socket.  The Socket is the handle used to
190 // access the underlying library.
191 func newSocket(p protocol) (mangos.Socket, error) {
192
193     var s mangos.Socket
194     var err error
195
196     switch p {
197     case PUB:
198         s, err = pub.NewSocket()
199     case SUB:
200         s, err = sub.NewSocket()
201     case PUSH:
202         s, err = push.NewSocket()
203     case PULL:
204         s, err = pull.NewSocket()
205     case REQ:
206         s, err = req.NewSocket()
207     case REP:
208         s, err = rep.NewSocket()
209     case SURVEYOR:
210         s, err = surveyor.NewSocket()
211     case RESPONDENT:
212         s, err = respondent.NewSocket()
213     case PAIR:
214         s, err = pair.NewSocket()
215     case BUS:
216         s, err = bus.NewSocket()
217     default:
218         err = mangos.ErrBadProto
219     }
220
221     if err != nil {
222         return nil, err
223     }
224
225     all.AddTransports(s)
226     // s.AddTransport(ipc.NewTransport())
227     // s.AddTransport(tcp.NewTransport())
228
229     return s, nil
230 }
231
232 func die(format string, v ...interface{}) {
233     fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
234     os.Exit(1)
235 }
236
237 // Protocol is the numeric abstraction to the various protocols or patterns
238 // that Mangos supports.
239 type protocol int
240
241 // Constants for protocols.
242 const (
243     PUSH       = protocol(mangos.ProtoPush)
244     PULL       = protocol(mangos.ProtoPull)
245     PUB        = protocol(mangos.ProtoPub)
246     SUB        = protocol(mangos.ProtoSub)
247     REQ        = protocol(mangos.ProtoReq)
248     REP        = protocol(mangos.ProtoRep)
249     SURVEYOR   = protocol(mangos.ProtoSurveyor)
250     RESPONDENT = protocol(mangos.ProtoRespondent)
251     BUS        = protocol(mangos.ProtoBus)
252     PAIR       = protocol(mangos.ProtoPair)
253 )
254
255 func protoProducer(m Mode) protocol {
256     switch m {
257     case PushPull:
258         return PUSH
259     case PubSub:
260         return PUB
261     case ReqRep:
262         return REP
263     case SurvResp:
264         return SURVEYOR
265     case Bus:
266         return BUS
267     case Pair:
268         return PAIR
269     }
270     return PUSH
271 }
272
273 func protoConsumer(m Mode) protocol {
274     switch m {
275     case PushPull:
276         return PULL
277     case PubSub:
278         return SUB
279     case ReqRep:
280         return REQ
281     case SurvResp:
282         return RESPONDENT
283     case Bus:
284         return BUS
285     case Pair:
286         return PAIR
287     }
288     return PULL
289 }