zhangmeng
2019-05-16 e6cfa59401d45f74a2815d33b62e22e02765d94e
提交 | 用户 | age
9d4b12 1 package deliver
Z 2
3 import (
4     "errors"
5     "fmt"
6     "os"
7     "strings"
9e1301 8     "time"
9d4b12 9
Z 10     "nanomsg.org/go-mangos"
11     "nanomsg.org/go-mangos/protocol/bus"
12     "nanomsg.org/go-mangos/protocol/pair"
13     "nanomsg.org/go-mangos/protocol/pub"
14     "nanomsg.org/go-mangos/protocol/pull"
15     "nanomsg.org/go-mangos/protocol/push"
16     "nanomsg.org/go-mangos/protocol/rep"
17     "nanomsg.org/go-mangos/protocol/req"
18     "nanomsg.org/go-mangos/protocol/respondent"
19     "nanomsg.org/go-mangos/protocol/sub"
20     "nanomsg.org/go-mangos/protocol/surveyor"
21     "nanomsg.org/go-mangos/transport/all"
22 )
23
24 // NNG mangos wrap
25 type NNG struct {
26     sock mangos.Socket
5ff1f3 27     raw  bool
9d4b12 28 }
Z 29
30 // Send impl interface Diliver
31 func (n *NNG) Send(data []byte) error {
32     if n.sock == nil {
33         return errors.New("please init NNG first")
34     }
35
c0dcd1 36     if surveyorTime > 0 {
Z 37         time.Sleep(time.Duration(surveyorTime*2) * time.Second)
38     }
39
5ff1f3 40     if n.raw {
9d4b12 41         msg := mangos.NewMessage(len(data))
Z 42         msg.Body = data
43         return n.sock.SendMsg(msg)
44     }
9e1301 45
9d4b12 46     return n.sock.Send(data)
Z 47 }
48
49 // Recv impl interface Diliver
50 func (n *NNG) Recv() ([]byte, error) {
51     if n.sock == nil {
52         return nil, errors.New("please init NNG first")
53     }
5ff1f3 54     if n.raw {
Z 55         var msg *mangos.Message
56         var err error
57         if msg, err = n.sock.RecvMsg(); err != nil {
58             return nil, err
a6b23c 59         }
5ff1f3 60         return msg.Body, nil
9d4b12 61     }
Z 62     return n.sock.Recv()
63 }
64
aaae99 65 // Close impl interface Deliver
Z 66 func (n *NNG) Close() {
67     if n.sock != nil {
68         n.sock.Close()
8e158e 69         n.sock = nil
aaae99 70     }
Z 71 }
72
e6cfa5 73 func nngPush(m Mode, url string, args ...interface{}) *NNG {
9d4b12 74
Z 75     rmExistedIpcName(url)
76     if sock, err := newSocket(protoProducer(m)); err == nil {
77         if err = setSocketOptions(sock, args); err != nil {
162fe9 78             sock.Close()
Z 79             sock = nil
9d4b12 80             return nil
Z 81         }
82         if err = sock.Listen(url); err != nil {
83             sock.Close()
8e158e 84             sock = nil
9d4b12 85             return nil
Z 86         }
87         return &NNG{
88             sock,
5ff1f3 89             true,
9d4b12 90         }
Z 91     }
92
93     return nil
94 }
95
e6cfa5 96 func nngPull(m Mode, url string, args ...interface{}) *NNG {
9d4b12 97
Z 98     if sock, err := newSocket(protoConsumer(m)); err == nil {
99         if err = setSocketOptions(sock, args); err != nil {
162fe9 100             sock.Close()
Z 101             sock = nil
9d4b12 102             return nil
Z 103         }
104
105         if err = sock.Dial(url); err != nil {
106             sock.Close()
8e158e 107             sock = nil
9d4b12 108             return nil
Z 109         }
110
111         return &NNG{
112             sock,
5ff1f3 113             true,
9d4b12 114         }
Z 115     }
116
117     return nil
118 }
119
9e1301 120 // maxRecvSize max recv size
Z 121 var (
122     maxRecvSize  = 33 * 1024 * 1024
c0dcd1 123     surveyorTime = -1
9e1301 124 )
9d4b12 125
8e158e 126 func defualtSocketOptions() map[string]interface{} {
9d4b12 127
8e158e 128     options := make(map[string]interface{})
Z 129
130     options[mangos.OptionMaxRecvSize] = maxRecvSize
131     options[mangos.OptionWriteQLen] = 0
132     options[mangos.OptionReadQLen] = 0
133     options[mangos.OptionRecvDeadline] = time.Second
134     options[mangos.OptionSendDeadline] = time.Second
135     options[mangos.OptionRaw] = true
136
137     return options
9d4b12 138 }
Z 139
140 func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
141
8e158e 142     options := defualtSocketOptions()
Z 143
9e1301 144     switch sock.GetProtocol().Number() {
Z 145     case mangos.ProtoSub:
9d4b12 146         for _, arg := range args {
Z 147             switch arg.(type) {
148             case string:
8e158e 149                 options[mangos.OptionSubscribe] = []byte(arg.(string))
9d4b12 150             default:
8e158e 151                 options[mangos.OptionSubscribe] = []byte("")
9d4b12 152             }
Z 153         }
9e1301 154     case mangos.ProtoSurveyor:
Z 155         for _, arg := range args {
156             switch arg.(type) {
157             case int:
c0dcd1 158                 if arg.(int) < 2 {
Z 159                     surveyorTime = 1
160                 } else {
161                     surveyorTime = arg.(int) / 2
162                 }
9e1301 163             default:
Z 164             }
c0dcd1 165             options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second
9e1301 166         }
Z 167     default:
168         fmt.Println("no additional args")
9d4b12 169     }
Z 170
8e158e 171     for k, v := range options {
Z 172         if err := sock.SetOption(k, v); err != nil {
173             return err
174         }
175     }
9d4b12 176     return nil
Z 177 }
178
179 func rmExistedIpcName(url string) {
180     s := strings.Split(url, "://")
181
182     if s[0] == "ipc" {
183         if _, err := os.Stat(s[1]); err == nil {
184             os.Remove(s[1])
185         }
186     }
187 }
188
189 // newSocket allocates a new Socket.  The Socket is the handle used to
190 // access the underlying library.
191 func newSocket(p protocol) (mangos.Socket, error) {
192
193     var s mangos.Socket
194     var err error
195
196     switch p {
197     case PUB:
198         s, err = pub.NewSocket()
199     case SUB:
200         s, err = sub.NewSocket()
201     case PUSH:
202         s, err = push.NewSocket()
203     case PULL:
204         s, err = pull.NewSocket()
205     case REQ:
206         s, err = req.NewSocket()
207     case REP:
208         s, err = rep.NewSocket()
209     case SURVEYOR:
210         s, err = surveyor.NewSocket()
211     case RESPONDENT:
212         s, err = respondent.NewSocket()
213     case PAIR:
214         s, err = pair.NewSocket()
215     case BUS:
216         s, err = bus.NewSocket()
217     default:
218         err = mangos.ErrBadProto
219     }
220
221     if err != nil {
222         return nil, err
223     }
224
225     all.AddTransports(s)
226
227     return s, nil
228 }
229
230 func die(format string, v ...interface{}) {
231     fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
232     os.Exit(1)
233 }
234
235 // Protocol is the numeric abstraction to the various protocols or patterns
236 // that Mangos supports.
237 type protocol int
238
239 // Constants for protocols.
240 const (
241     PUSH       = protocol(mangos.ProtoPush)
242     PULL       = protocol(mangos.ProtoPull)
243     PUB        = protocol(mangos.ProtoPub)
244     SUB        = protocol(mangos.ProtoSub)
245     REQ        = protocol(mangos.ProtoReq)
246     REP        = protocol(mangos.ProtoRep)
247     SURVEYOR   = protocol(mangos.ProtoSurveyor)
248     RESPONDENT = protocol(mangos.ProtoRespondent)
249     BUS        = protocol(mangos.ProtoBus)
250     PAIR       = protocol(mangos.ProtoPair)
251 )
252
253 func protoProducer(m Mode) protocol {
254     switch m {
255     case PushPull:
256         return PUSH
257     case PubSub:
258         return PUB
259     case ReqRep:
260         return REP
261     case SurvResp:
262         return SURVEYOR
263     case Bus:
264         return BUS
265     case Pair:
266         return PAIR
267     }
268     return PUSH
269 }
270
271 func protoConsumer(m Mode) protocol {
272     switch m {
273     case PushPull:
274         return PULL
275     case PubSub:
276         return SUB
277     case ReqRep:
278         return REQ
279     case SurvResp:
280         return RESPONDENT
281     case Bus:
282         return BUS
283     case Pair:
284         return PAIR
285     }
286     return PULL
287 }