zhangmeng
2019-05-15 4a091dce95fa50f8b0a055e3154ec2e7f4c276b3
提交 | 用户 | age
9d4b12 1 package deliver
Z 2
3 import (
4     "errors"
5     "fmt"
6     "os"
7     "strings"
9e1301 8     "time"
9d4b12 9
Z 10     "nanomsg.org/go-mangos"
11     "nanomsg.org/go-mangos/protocol/bus"
12     "nanomsg.org/go-mangos/protocol/pair"
13     "nanomsg.org/go-mangos/protocol/pub"
14     "nanomsg.org/go-mangos/protocol/pull"
15     "nanomsg.org/go-mangos/protocol/push"
16     "nanomsg.org/go-mangos/protocol/rep"
17     "nanomsg.org/go-mangos/protocol/req"
18     "nanomsg.org/go-mangos/protocol/respondent"
19     "nanomsg.org/go-mangos/protocol/sub"
20     "nanomsg.org/go-mangos/protocol/surveyor"
21     "nanomsg.org/go-mangos/transport/all"
22 )
23
24 // NNG mangos wrap
25 type NNG struct {
26     sock mangos.Socket
27 }
28
29 // Send impl interface Diliver
30 func (n *NNG) Send(data []byte) error {
31     if n.sock == nil {
32         return errors.New("please init NNG first")
33     }
34
9e1301 35     switch n.sock.GetProtocol().Number() {
Z 36     case mangos.ProtoSurveyor:
37         time.Sleep(surveyorTime * 2)
38     default:
39     }
9d4b12 40     if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
Z 41         msg := mangos.NewMessage(len(data))
42         msg.Body = data
43         return n.sock.SendMsg(msg)
44     }
9e1301 45
9d4b12 46     return n.sock.Send(data)
Z 47 }
48
49 // Recv impl interface Diliver
50 func (n *NNG) Recv() ([]byte, error) {
51     if n.sock == nil {
52         return nil, errors.New("please init NNG first")
53     }
54     if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
55         msg, err := n.sock.RecvMsg()
56         return msg.Body, err
57     }
58     return n.sock.Recv()
59 }
60
f5368c 61 // nngProducer create from deliver Mode
Z 62 func nngProducer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 63
Z 64     rmExistedIpcName(url)
65     if sock, err := newSocket(protoProducer(m)); err == nil {
66         if err = setSocketOptions(sock, args); err != nil {
67             return nil
68         }
69         if err = sock.Listen(url); err != nil {
70             sock.Close()
71             return nil
72         }
73         return &NNG{
74             sock,
75         }
76     }
77
78     return nil
79 }
80
f5368c 81 // nngConsumer create from deliver Mode
Z 82 func nngConsumer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 83
Z 84     if sock, err := newSocket(protoConsumer(m)); err == nil {
85         if err = setSocketOptions(sock, args); err != nil {
86             return nil
87         }
88
89         if err = sock.Dial(url); err != nil {
90             sock.Close()
91             return nil
92         }
93
94         return &NNG{
95             sock,
96         }
97     }
98
99     return nil
100 }
101
9e1301 102 // maxRecvSize max recv size
Z 103 var (
104     maxRecvSize  = 33 * 1024 * 1024
105     surveyorTime = time.Second / 2
106 )
9d4b12 107
Z 108 func defualtSocketOptions(sock mangos.Socket) error {
109     var err error
9e1301 110     if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil {
9d4b12 111         sock.Close()
Z 112         return err
113     }
114     if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil {
115         sock.Close()
116         return err
117     }
118     if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil {
119         sock.Close()
120         return err
121     }
4a091d 122     // if err = sock.SetOption(mangos.OptionRecvDeadline, time.Second); err != nil {
Z 123     //     sock.Close()
124     //     return err
125     // }
126     // if err = sock.SetOption(mangos.OptionSendDeadline, time.Second); err != nil {
9d4b12 127     //     sock.Close()
Z 128     //     return err
129     // }
130     if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
131         sock.Close()
132         return err
133     }
134
135     return nil
136 }
137
138 func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
139
140     err := defualtSocketOptions(sock)
141     if err != nil {
142         return err
143     }
9e1301 144     switch sock.GetProtocol().Number() {
Z 145     case mangos.ProtoSub:
9d4b12 146         for _, arg := range args {
Z 147             switch arg.(type) {
148             case string:
149                 err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string)))
150             default:
151                 err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
152             }
153         }
9e1301 154     case mangos.ProtoSurveyor:
Z 155         for _, arg := range args {
156             switch arg.(type) {
157             case int:
158                 surveyorTime = time.Duration(arg.(int)/2) * time.Second
159             default:
160             }
161             err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime)
162         }
163     default:
164         fmt.Println("no additional args")
9d4b12 165     }
Z 166
167     return nil
168 }
169
170 func rmExistedIpcName(url string) {
171     s := strings.Split(url, "://")
172
173     if s[0] == "ipc" {
174         if _, err := os.Stat(s[1]); err == nil {
175             os.Remove(s[1])
176         }
177     }
178 }
179
180 // newSocket allocates a new Socket.  The Socket is the handle used to
181 // access the underlying library.
182 func newSocket(p protocol) (mangos.Socket, error) {
183
184     var s mangos.Socket
185     var err error
186
187     switch p {
188     case PUB:
189         s, err = pub.NewSocket()
190     case SUB:
191         s, err = sub.NewSocket()
192     case PUSH:
193         s, err = push.NewSocket()
194     case PULL:
195         s, err = pull.NewSocket()
196     case REQ:
197         s, err = req.NewSocket()
198     case REP:
199         s, err = rep.NewSocket()
200     case SURVEYOR:
201         s, err = surveyor.NewSocket()
202     case RESPONDENT:
203         s, err = respondent.NewSocket()
204     case PAIR:
205         s, err = pair.NewSocket()
206     case BUS:
207         s, err = bus.NewSocket()
208     default:
209         err = mangos.ErrBadProto
210     }
211
212     if err != nil {
213         return nil, err
214     }
215
216     all.AddTransports(s)
217     // s.AddTransport(ipc.NewTransport())
218     // s.AddTransport(tcp.NewTransport())
219
220     return s, nil
221 }
222
223 func die(format string, v ...interface{}) {
224     fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
225     os.Exit(1)
226 }
227
228 // Protocol is the numeric abstraction to the various protocols or patterns
229 // that Mangos supports.
230 type protocol int
231
232 // Constants for protocols.
233 const (
234     PUSH       = protocol(mangos.ProtoPush)
235     PULL       = protocol(mangos.ProtoPull)
236     PUB        = protocol(mangos.ProtoPub)
237     SUB        = protocol(mangos.ProtoSub)
238     REQ        = protocol(mangos.ProtoReq)
239     REP        = protocol(mangos.ProtoRep)
240     SURVEYOR   = protocol(mangos.ProtoSurveyor)
241     RESPONDENT = protocol(mangos.ProtoRespondent)
242     BUS        = protocol(mangos.ProtoBus)
243     PAIR       = protocol(mangos.ProtoPair)
244 )
245
246 func protoProducer(m Mode) protocol {
247     switch m {
248     case PushPull:
249         return PUSH
250     case PubSub:
251         return PUB
252     case ReqRep:
253         return REP
254     case SurvResp:
255         return SURVEYOR
256     case Bus:
257         return BUS
258     case Pair:
259         return PAIR
260     }
261     return PUSH
262 }
263
264 func protoConsumer(m Mode) protocol {
265     switch m {
266     case PushPull:
267         return PULL
268     case PubSub:
269         return SUB
270     case ReqRep:
271         return REQ
272     case SurvResp:
273         return RESPONDENT
274     case Bus:
275         return BUS
276     case Pair:
277         return PAIR
278     }
279     return PULL
280 }