zhangmeng
2019-05-15 9e1301abf98fb4e04fabba535c1bb5ad161a66a4
提交 | 用户 | age
9d4b12 1 package deliver
Z 2
3 import (
4     "errors"
5     "fmt"
6     "os"
7     "strings"
9e1301 8     "time"
9d4b12 9
Z 10     "nanomsg.org/go-mangos"
11     "nanomsg.org/go-mangos/protocol/bus"
12     "nanomsg.org/go-mangos/protocol/pair"
13     "nanomsg.org/go-mangos/protocol/pub"
14     "nanomsg.org/go-mangos/protocol/pull"
15     "nanomsg.org/go-mangos/protocol/push"
16     "nanomsg.org/go-mangos/protocol/rep"
17     "nanomsg.org/go-mangos/protocol/req"
18     "nanomsg.org/go-mangos/protocol/respondent"
19     "nanomsg.org/go-mangos/protocol/sub"
20     "nanomsg.org/go-mangos/protocol/surveyor"
21     "nanomsg.org/go-mangos/transport/all"
22 )
23
24 // NNG mangos wrap
25 type NNG struct {
26     sock mangos.Socket
27 }
28
29 // Send impl interface Diliver
30 func (n *NNG) Send(data []byte) error {
31     if n.sock == nil {
32         return errors.New("please init NNG first")
33     }
34
9e1301 35     switch n.sock.GetProtocol().Number() {
Z 36     case mangos.ProtoSurveyor:
37         time.Sleep(surveyorTime * 2)
38     default:
39     }
9d4b12 40     if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
Z 41         msg := mangos.NewMessage(len(data))
42         msg.Body = data
43         return n.sock.SendMsg(msg)
44     }
9e1301 45
9d4b12 46     return n.sock.Send(data)
Z 47 }
48
49 // Recv impl interface Diliver
50 func (n *NNG) Recv() ([]byte, error) {
51     if n.sock == nil {
52         return nil, errors.New("please init NNG first")
53     }
54     if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
55         msg, err := n.sock.RecvMsg()
56         return msg.Body, err
57     }
58     return n.sock.Recv()
59 }
60
f5368c 61 // nngProducer create from deliver Mode
Z 62 func nngProducer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 63
Z 64     rmExistedIpcName(url)
65     if sock, err := newSocket(protoProducer(m)); err == nil {
66         if err = setSocketOptions(sock, args); err != nil {
67             return nil
68         }
69         if err = sock.Listen(url); err != nil {
70             sock.Close()
71             return nil
72         }
73         return &NNG{
74             sock,
75         }
76     }
77
78     return nil
79 }
80
f5368c 81 // nngConsumer create from deliver Mode
Z 82 func nngConsumer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 83
Z 84     if sock, err := newSocket(protoConsumer(m)); err == nil {
85         if err = setSocketOptions(sock, args); err != nil {
86             return nil
87         }
88
89         if err = sock.Dial(url); err != nil {
90             sock.Close()
91             return nil
92         }
93
94         return &NNG{
95             sock,
96         }
97     }
98
99     return nil
100 }
101
9e1301 102 // maxRecvSize max recv size
Z 103 var (
104     maxRecvSize  = 33 * 1024 * 1024
105     surveyorTime = time.Second / 2
106 )
9d4b12 107
Z 108 func defualtSocketOptions(sock mangos.Socket) error {
109     var err error
9e1301 110     if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil {
9d4b12 111         sock.Close()
Z 112         return err
113     }
114     if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil {
115         sock.Close()
116         return err
117     }
118     if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil {
119         sock.Close()
120         return err
121     }
122     // if err = sock.SetOption(mangos.OptionNoDelay, true); err != nil {
123     //     sock.Close()
124     //     return err
125     // }
126     if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
127         sock.Close()
128         return err
129     }
130
131     return nil
132 }
133
134 func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
135
136     err := defualtSocketOptions(sock)
137     if err != nil {
138         return err
139     }
9e1301 140     switch sock.GetProtocol().Number() {
Z 141     case mangos.ProtoSub:
9d4b12 142         for _, arg := range args {
Z 143             switch arg.(type) {
144             case string:
145                 err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string)))
146             default:
147                 err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
148             }
149         }
9e1301 150     case mangos.ProtoSurveyor:
Z 151         for _, arg := range args {
152             switch arg.(type) {
153             case int:
154                 surveyorTime = time.Duration(arg.(int)/2) * time.Second
155             default:
156             }
157             err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime)
158         }
159     default:
160         fmt.Println("no additional args")
9d4b12 161     }
Z 162
163     return nil
164 }
165
166 func rmExistedIpcName(url string) {
167     s := strings.Split(url, "://")
168
169     if s[0] == "ipc" {
170         if _, err := os.Stat(s[1]); err == nil {
171             os.Remove(s[1])
172         }
173     }
174 }
175
176 // newSocket allocates a new Socket.  The Socket is the handle used to
177 // access the underlying library.
178 func newSocket(p protocol) (mangos.Socket, error) {
179
180     var s mangos.Socket
181     var err error
182
183     switch p {
184     case PUB:
185         s, err = pub.NewSocket()
186     case SUB:
187         s, err = sub.NewSocket()
188     case PUSH:
189         s, err = push.NewSocket()
190     case PULL:
191         s, err = pull.NewSocket()
192     case REQ:
193         s, err = req.NewSocket()
194     case REP:
195         s, err = rep.NewSocket()
196     case SURVEYOR:
197         s, err = surveyor.NewSocket()
198     case RESPONDENT:
199         s, err = respondent.NewSocket()
200     case PAIR:
201         s, err = pair.NewSocket()
202     case BUS:
203         s, err = bus.NewSocket()
204     default:
205         err = mangos.ErrBadProto
206     }
207
208     if err != nil {
209         return nil, err
210     }
211
212     all.AddTransports(s)
213     // s.AddTransport(ipc.NewTransport())
214     // s.AddTransport(tcp.NewTransport())
215
216     return s, nil
217 }
218
219 func die(format string, v ...interface{}) {
220     fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
221     os.Exit(1)
222 }
223
224 // Protocol is the numeric abstraction to the various protocols or patterns
225 // that Mangos supports.
226 type protocol int
227
228 // Constants for protocols.
229 const (
230     PUSH       = protocol(mangos.ProtoPush)
231     PULL       = protocol(mangos.ProtoPull)
232     PUB        = protocol(mangos.ProtoPub)
233     SUB        = protocol(mangos.ProtoSub)
234     REQ        = protocol(mangos.ProtoReq)
235     REP        = protocol(mangos.ProtoRep)
236     SURVEYOR   = protocol(mangos.ProtoSurveyor)
237     RESPONDENT = protocol(mangos.ProtoRespondent)
238     BUS        = protocol(mangos.ProtoBus)
239     PAIR       = protocol(mangos.ProtoPair)
240 )
241
242 func protoProducer(m Mode) protocol {
243     switch m {
244     case PushPull:
245         return PUSH
246     case PubSub:
247         return PUB
248     case ReqRep:
249         return REP
250     case SurvResp:
251         return SURVEYOR
252     case Bus:
253         return BUS
254     case Pair:
255         return PAIR
256     }
257     return PUSH
258 }
259
260 func protoConsumer(m Mode) protocol {
261     switch m {
262     case PushPull:
263         return PULL
264     case PubSub:
265         return SUB
266     case ReqRep:
267         return REQ
268     case SurvResp:
269         return RESPONDENT
270     case Bus:
271         return BUS
272     case Pair:
273         return PAIR
274     }
275     return PULL
276 }