zhangmeng
2019-05-15 aaae9917c3df122e2bbcf591417c1b58021d79fc
提交 | 用户 | age
9d4b12 1 package deliver
Z 2
3 import (
4     "errors"
5     "fmt"
6     "os"
7     "strings"
9e1301 8     "time"
9d4b12 9
Z 10     "nanomsg.org/go-mangos"
11     "nanomsg.org/go-mangos/protocol/bus"
12     "nanomsg.org/go-mangos/protocol/pair"
13     "nanomsg.org/go-mangos/protocol/pub"
14     "nanomsg.org/go-mangos/protocol/pull"
15     "nanomsg.org/go-mangos/protocol/push"
16     "nanomsg.org/go-mangos/protocol/rep"
17     "nanomsg.org/go-mangos/protocol/req"
18     "nanomsg.org/go-mangos/protocol/respondent"
19     "nanomsg.org/go-mangos/protocol/sub"
20     "nanomsg.org/go-mangos/protocol/surveyor"
21     "nanomsg.org/go-mangos/transport/all"
22 )
23
24 // NNG mangos wrap
25 type NNG struct {
26     sock mangos.Socket
27 }
28
29 // Send impl interface Diliver
30 func (n *NNG) Send(data []byte) error {
31     if n.sock == nil {
32         return errors.New("please init NNG first")
33     }
34
9e1301 35     switch n.sock.GetProtocol().Number() {
Z 36     case mangos.ProtoSurveyor:
37         time.Sleep(surveyorTime * 2)
38     default:
39     }
9d4b12 40     if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
Z 41         msg := mangos.NewMessage(len(data))
42         msg.Body = data
43         return n.sock.SendMsg(msg)
44     }
9e1301 45
9d4b12 46     return n.sock.Send(data)
Z 47 }
48
49 // Recv impl interface Diliver
50 func (n *NNG) Recv() ([]byte, error) {
51     if n.sock == nil {
52         return nil, errors.New("please init NNG first")
53     }
54     if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
55         msg, err := n.sock.RecvMsg()
56         return msg.Body, err
57     }
58     return n.sock.Recv()
59 }
60
aaae99 61 // Close impl interface Deliver
Z 62 func (n *NNG) Close() {
63     if n.sock != nil {
64         n.sock.Close()
65     }
66 }
67
f5368c 68 // nngProducer create from deliver Mode
Z 69 func nngProducer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 70
Z 71     rmExistedIpcName(url)
72     if sock, err := newSocket(protoProducer(m)); err == nil {
73         if err = setSocketOptions(sock, args); err != nil {
74             return nil
75         }
76         if err = sock.Listen(url); err != nil {
77             sock.Close()
78             return nil
79         }
80         return &NNG{
81             sock,
82         }
83     }
84
85     return nil
86 }
87
f5368c 88 // nngConsumer create from deliver Mode
Z 89 func nngConsumer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 90
Z 91     if sock, err := newSocket(protoConsumer(m)); err == nil {
92         if err = setSocketOptions(sock, args); err != nil {
93             return nil
94         }
95
96         if err = sock.Dial(url); err != nil {
97             sock.Close()
98             return nil
99         }
100
101         return &NNG{
102             sock,
103         }
104     }
105
106     return nil
107 }
108
9e1301 109 // maxRecvSize max recv size
Z 110 var (
111     maxRecvSize  = 33 * 1024 * 1024
112     surveyorTime = time.Second / 2
113 )
9d4b12 114
Z 115 func defualtSocketOptions(sock mangos.Socket) error {
116     var err error
9e1301 117     if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil {
9d4b12 118         sock.Close()
Z 119         return err
120     }
121     if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil {
122         sock.Close()
123         return err
124     }
125     if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil {
126         sock.Close()
127         return err
128     }
4a091d 129     // if err = sock.SetOption(mangos.OptionRecvDeadline, time.Second); err != nil {
Z 130     //     sock.Close()
131     //     return err
132     // }
133     // if err = sock.SetOption(mangos.OptionSendDeadline, time.Second); err != nil {
9d4b12 134     //     sock.Close()
Z 135     //     return err
136     // }
137     if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
138         sock.Close()
139         return err
140     }
141
142     return nil
143 }
144
145 func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
146
147     err := defualtSocketOptions(sock)
148     if err != nil {
149         return err
150     }
9e1301 151     switch sock.GetProtocol().Number() {
Z 152     case mangos.ProtoSub:
9d4b12 153         for _, arg := range args {
Z 154             switch arg.(type) {
155             case string:
156                 err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string)))
157             default:
158                 err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
159             }
160         }
9e1301 161     case mangos.ProtoSurveyor:
Z 162         for _, arg := range args {
163             switch arg.(type) {
164             case int:
165                 surveyorTime = time.Duration(arg.(int)/2) * time.Second
166             default:
167             }
168             err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime)
169         }
170     default:
171         fmt.Println("no additional args")
9d4b12 172     }
Z 173
174     return nil
175 }
176
177 func rmExistedIpcName(url string) {
178     s := strings.Split(url, "://")
179
180     if s[0] == "ipc" {
181         if _, err := os.Stat(s[1]); err == nil {
182             os.Remove(s[1])
183         }
184     }
185 }
186
187 // newSocket allocates a new Socket.  The Socket is the handle used to
188 // access the underlying library.
189 func newSocket(p protocol) (mangos.Socket, error) {
190
191     var s mangos.Socket
192     var err error
193
194     switch p {
195     case PUB:
196         s, err = pub.NewSocket()
197     case SUB:
198         s, err = sub.NewSocket()
199     case PUSH:
200         s, err = push.NewSocket()
201     case PULL:
202         s, err = pull.NewSocket()
203     case REQ:
204         s, err = req.NewSocket()
205     case REP:
206         s, err = rep.NewSocket()
207     case SURVEYOR:
208         s, err = surveyor.NewSocket()
209     case RESPONDENT:
210         s, err = respondent.NewSocket()
211     case PAIR:
212         s, err = pair.NewSocket()
213     case BUS:
214         s, err = bus.NewSocket()
215     default:
216         err = mangos.ErrBadProto
217     }
218
219     if err != nil {
220         return nil, err
221     }
222
223     all.AddTransports(s)
224     // s.AddTransport(ipc.NewTransport())
225     // s.AddTransport(tcp.NewTransport())
226
227     return s, nil
228 }
229
230 func die(format string, v ...interface{}) {
231     fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
232     os.Exit(1)
233 }
234
235 // Protocol is the numeric abstraction to the various protocols or patterns
236 // that Mangos supports.
237 type protocol int
238
239 // Constants for protocols.
240 const (
241     PUSH       = protocol(mangos.ProtoPush)
242     PULL       = protocol(mangos.ProtoPull)
243     PUB        = protocol(mangos.ProtoPub)
244     SUB        = protocol(mangos.ProtoSub)
245     REQ        = protocol(mangos.ProtoReq)
246     REP        = protocol(mangos.ProtoRep)
247     SURVEYOR   = protocol(mangos.ProtoSurveyor)
248     RESPONDENT = protocol(mangos.ProtoRespondent)
249     BUS        = protocol(mangos.ProtoBus)
250     PAIR       = protocol(mangos.ProtoPair)
251 )
252
253 func protoProducer(m Mode) protocol {
254     switch m {
255     case PushPull:
256         return PUSH
257     case PubSub:
258         return PUB
259     case ReqRep:
260         return REP
261     case SurvResp:
262         return SURVEYOR
263     case Bus:
264         return BUS
265     case Pair:
266         return PAIR
267     }
268     return PUSH
269 }
270
271 func protoConsumer(m Mode) protocol {
272     switch m {
273     case PushPull:
274         return PULL
275     case PubSub:
276         return SUB
277     case ReqRep:
278         return REQ
279     case SurvResp:
280         return RESPONDENT
281     case Bus:
282         return BUS
283     case Pair:
284         return PAIR
285     }
286     return PULL
287 }