zhangmeng
2019-05-15 9d4b12ffee1c25de247568c3f9a51be4996da09b
提交 | 用户 | age
9d4b12 1 package deliver
Z 2
3 import (
4     "errors"
5     "fmt"
6     "os"
7     "strings"
8
9     "nanomsg.org/go-mangos"
10     "nanomsg.org/go-mangos/protocol/bus"
11     "nanomsg.org/go-mangos/protocol/pair"
12     "nanomsg.org/go-mangos/protocol/pub"
13     "nanomsg.org/go-mangos/protocol/pull"
14     "nanomsg.org/go-mangos/protocol/push"
15     "nanomsg.org/go-mangos/protocol/rep"
16     "nanomsg.org/go-mangos/protocol/req"
17     "nanomsg.org/go-mangos/protocol/respondent"
18     "nanomsg.org/go-mangos/protocol/sub"
19     "nanomsg.org/go-mangos/protocol/surveyor"
20     "nanomsg.org/go-mangos/transport/all"
21 )
22
23 // NNG mangos wrap
24 type NNG struct {
25     sock mangos.Socket
26 }
27
28 // Send impl interface Diliver
29 func (n *NNG) Send(data []byte) error {
30     if n.sock == nil {
31         return errors.New("please init NNG first")
32     }
33
34     if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
35         msg := mangos.NewMessage(len(data))
36         msg.Body = data
37         return n.sock.SendMsg(msg)
38     }
39     return n.sock.Send(data)
40 }
41
42 // Recv impl interface Diliver
43 func (n *NNG) Recv() ([]byte, error) {
44     if n.sock == nil {
45         return nil, errors.New("please init NNG first")
46     }
47     if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
48         msg, err := n.sock.RecvMsg()
49         return msg.Body, err
50     }
51     return n.sock.Recv()
52 }
53
54 // NewNNGProducer create from deliver Mode
55 func NewNNGProducer(m Mode, url string, args ...interface{}) *NNG {
56
57     rmExistedIpcName(url)
58     if sock, err := newSocket(protoProducer(m)); err == nil {
59         if err = setSocketOptions(sock, args); err != nil {
60             return nil
61         }
62         if err = sock.Listen(url); err != nil {
63             sock.Close()
64             return nil
65         }
66         return &NNG{
67             sock,
68         }
69     }
70
71     return nil
72 }
73
74 // NewNNGConsumer create from deliver Mode
75 func NewNNGConsumer(m Mode, url string, args ...interface{}) *NNG {
76
77     if sock, err := newSocket(protoConsumer(m)); err == nil {
78         if err = setSocketOptions(sock, args); err != nil {
79             return nil
80         }
81
82         if err = sock.Dial(url); err != nil {
83             sock.Close()
84             return nil
85         }
86
87         return &NNG{
88             sock,
89         }
90     }
91
92     return nil
93 }
94
95 // MaxRecvSize max recv size
96 var MaxRecvSize = 33 * 1024 * 1024
97
98 func defualtSocketOptions(sock mangos.Socket) error {
99     var err error
100     if err = sock.SetOption(mangos.OptionMaxRecvSize, MaxRecvSize); err != nil {
101         sock.Close()
102         return err
103     }
104     if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil {
105         sock.Close()
106         return err
107     }
108     if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil {
109         sock.Close()
110         return err
111     }
112     // if err = sock.SetOption(mangos.OptionNoDelay, true); err != nil {
113     //     sock.Close()
114     //     return err
115     // }
116     if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
117         sock.Close()
118         return err
119     }
120
121     return nil
122 }
123
124 func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
125
126     err := defualtSocketOptions(sock)
127     if err != nil {
128         return err
129     }
130     if sock.GetProtocol().Number() == mangos.ProtoSub {
131         for _, arg := range args {
132             switch arg.(type) {
133             case string:
134                 err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string)))
135             default:
136                 err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
137             }
138         }
139     }
140
141     return nil
142 }
143
144 func rmExistedIpcName(url string) {
145     s := strings.Split(url, "://")
146
147     if s[0] == "ipc" {
148         if _, err := os.Stat(s[1]); err == nil {
149             os.Remove(s[1])
150         }
151     }
152 }
153
154 // newSocket allocates a new Socket.  The Socket is the handle used to
155 // access the underlying library.
156 func newSocket(p protocol) (mangos.Socket, error) {
157
158     var s mangos.Socket
159     var err error
160
161     switch p {
162     case PUB:
163         s, err = pub.NewSocket()
164     case SUB:
165         s, err = sub.NewSocket()
166     case PUSH:
167         s, err = push.NewSocket()
168     case PULL:
169         s, err = pull.NewSocket()
170     case REQ:
171         s, err = req.NewSocket()
172     case REP:
173         s, err = rep.NewSocket()
174     case SURVEYOR:
175         s, err = surveyor.NewSocket()
176     case RESPONDENT:
177         s, err = respondent.NewSocket()
178     case PAIR:
179         s, err = pair.NewSocket()
180     case BUS:
181         s, err = bus.NewSocket()
182     default:
183         err = mangos.ErrBadProto
184     }
185
186     if err != nil {
187         return nil, err
188     }
189
190     all.AddTransports(s)
191     // s.AddTransport(ipc.NewTransport())
192     // s.AddTransport(tcp.NewTransport())
193
194     return s, nil
195 }
196
197 func die(format string, v ...interface{}) {
198     fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
199     os.Exit(1)
200 }
201
202 // Protocol is the numeric abstraction to the various protocols or patterns
203 // that Mangos supports.
204 type protocol int
205
206 // Constants for protocols.
207 const (
208     PUSH       = protocol(mangos.ProtoPush)
209     PULL       = protocol(mangos.ProtoPull)
210     PUB        = protocol(mangos.ProtoPub)
211     SUB        = protocol(mangos.ProtoSub)
212     REQ        = protocol(mangos.ProtoReq)
213     REP        = protocol(mangos.ProtoRep)
214     SURVEYOR   = protocol(mangos.ProtoSurveyor)
215     RESPONDENT = protocol(mangos.ProtoRespondent)
216     BUS        = protocol(mangos.ProtoBus)
217     PAIR       = protocol(mangos.ProtoPair)
218 )
219
220 func protoProducer(m Mode) protocol {
221     switch m {
222     case PushPull:
223         return PUSH
224     case PubSub:
225         return PUB
226     case ReqRep:
227         return REP
228     case SurvResp:
229         return SURVEYOR
230     case Bus:
231         return BUS
232     case Pair:
233         return PAIR
234     }
235     return PUSH
236 }
237
238 func protoConsumer(m Mode) protocol {
239     switch m {
240     case PushPull:
241         return PULL
242     case PubSub:
243         return SUB
244     case ReqRep:
245         return REQ
246     case SurvResp:
247         return RESPONDENT
248     case Bus:
249         return BUS
250     case Pair:
251         return PAIR
252     }
253     return PULL
254 }