zhangmeng
2019-05-29 631409e7b39ab73f89b076804459cec1f4fbafbe
提交 | 用户 | age
9d4b12 1 package deliver
Z 2
3 import (
d23f54 4     "errors"
9d4b12 5     "fmt"
Z 6     "os"
7     "strings"
9e1301 8     "time"
9d4b12 9
Z 10     "nanomsg.org/go-mangos"
11     "nanomsg.org/go-mangos/protocol/bus"
12     "nanomsg.org/go-mangos/protocol/pair"
13     "nanomsg.org/go-mangos/protocol/pub"
14     "nanomsg.org/go-mangos/protocol/pull"
15     "nanomsg.org/go-mangos/protocol/push"
16     "nanomsg.org/go-mangos/protocol/rep"
17     "nanomsg.org/go-mangos/protocol/req"
18     "nanomsg.org/go-mangos/protocol/respondent"
19     "nanomsg.org/go-mangos/protocol/sub"
20     "nanomsg.org/go-mangos/protocol/surveyor"
21     "nanomsg.org/go-mangos/transport/all"
22 )
23
24 // NNG mangos wrap
25 type NNG struct {
9a89af 26     sock mangos.Socket
Z 27     typ  td
28     mode Mode
29     url  string
36766a 30
Z 31     arguments []interface{}
9d4b12 32 }
Z 33
34 // Send impl interface Diliver
35 func (n *NNG) Send(data []byte) error {
c2bbe3 36     if n == nil {
Z 37         return errors.New("please init NNG first")
38     }
36766a 39     var err error
9d4b12 40     if n.sock == nil {
d23f54 41         n.sock, err = n.makeNNG(agent)
36766a 42         if err != nil {
d23f54 43             fmt.Println("create nng sender error")
36766a 44             return err
Z 45         }
9d4b12 46     }
Z 47
c0dcd1 48     if surveyorTime > 0 {
Z 49         time.Sleep(time.Duration(surveyorTime*2) * time.Second)
50     }
51
36766a 52     msg := mangos.NewMessage(len(data))
Z 53     msg.Body = data
54     return n.sock.SendMsg(msg)
9e1301 55
9d4b12 56 }
Z 57
58 // Recv impl interface Diliver
59 func (n *NNG) Recv() ([]byte, error) {
c2bbe3 60     if n == nil {
Z 61         return nil, errors.New("please init NNG first")
62     }
63
36766a 64     var err error
Z 65
9d4b12 66     if n.sock == nil {
d23f54 67         n.sock, err = n.makeNNG(coactee)
36766a 68         if err != nil {
d23f54 69             fmt.Println("create nng reciever error")
5ff1f3 70             return nil, err
a6b23c 71         }
9d4b12 72     }
36766a 73
Z 74     var msg *mangos.Message
75     if msg, err = n.sock.RecvMsg(); err != nil {
76         return nil, err
77     }
78     return msg.Body, nil
79
9d4b12 80 }
Z 81
aaae99 82 // Close impl interface Deliver
Z 83 func (n *NNG) Close() {
c2bbe3 84     if n != nil && n.sock != nil {
aaae99 85         n.sock.Close()
8e158e 86         n.sock = nil
aaae99 87     }
Z 88 }
89
6887a3 90 func nngServer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 91
Z 92     rmExistedIpcName(url)
93
36766a 94     return &NNG{
9a89af 95         typ:       agent,
36766a 96         mode:      m,
Z 97         url:       url,
98         arguments: args,
99     }
9d4b12 100 }
Z 101
6887a3 102 func nngClient(m Mode, url string, args ...interface{}) *NNG {
9d4b12 103
36766a 104     return &NNG{
9a89af 105         typ:       coactee,
36766a 106         mode:      m,
Z 107         url:       url,
108         arguments: args,
9d4b12 109     }
Z 110
36766a 111 }
Z 112
9893ed 113 func proto(typ td, m Mode) protocol {
d23f54 114     if typ == agent {
Z 115         return protoAgent(m)
116     } else if typ == coactee {
117         return protoCoactee(m)
36766a 118     }
d23f54 119     return NONE
36766a 120 }
Z 121
9893ed 122 func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
36766a 123
Z 124     var sock mangos.Socket
125     var err error
126
d23f54 127     switch n.mode {
Z 128     case Bus:
129         sock, err = n.busMakeNNG(typ)
130     case ReqRep:
631409 131     case SurvResp:
d23f54 132         sock, err = n.rrMakeNNG(typ)
9e1301 133     default:
d23f54 134         sock, err = n.ppMakeNNG(typ)
9d4b12 135     }
Z 136
d23f54 137     return sock, err
9d4b12 138 }
Z 139
140 func rmExistedIpcName(url string) {
141     s := strings.Split(url, "://")
142
143     if s[0] == "ipc" {
144         if _, err := os.Stat(s[1]); err == nil {
145             os.Remove(s[1])
146         }
147     }
148 }
149
150 // newSocket allocates a new Socket.  The Socket is the handle used to
151 // access the underlying library.
152 func newSocket(p protocol) (mangos.Socket, error) {
153
d23f54 154     if p == NONE {
Z 155         return nil, errors.New("new socket protocol none")
156     }
9d4b12 157     var s mangos.Socket
Z 158     var err error
159
160     switch p {
161     case PUB:
162         s, err = pub.NewSocket()
163     case SUB:
164         s, err = sub.NewSocket()
165     case PUSH:
166         s, err = push.NewSocket()
167     case PULL:
168         s, err = pull.NewSocket()
169     case REQ:
170         s, err = req.NewSocket()
171     case REP:
172         s, err = rep.NewSocket()
173     case SURVEYOR:
174         s, err = surveyor.NewSocket()
175     case RESPONDENT:
176         s, err = respondent.NewSocket()
177     case PAIR:
178         s, err = pair.NewSocket()
179     case BUS:
180         s, err = bus.NewSocket()
181     default:
182         err = mangos.ErrBadProto
183     }
184
185     if err != nil {
186         return nil, err
187     }
188
189     all.AddTransports(s)
190
191     return s, nil
192 }
193
194 func die(format string, v ...interface{}) {
195     fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
196     os.Exit(1)
197 }
198
199 // Protocol is the numeric abstraction to the various protocols or patterns
200 // that Mangos supports.
201 type protocol int
202
203 // Constants for protocols.
204 const (
d23f54 205     NONE       = -1
9d4b12 206     PUSH       = protocol(mangos.ProtoPush)
Z 207     PULL       = protocol(mangos.ProtoPull)
208     PUB        = protocol(mangos.ProtoPub)
209     SUB        = protocol(mangos.ProtoSub)
210     REQ        = protocol(mangos.ProtoReq)
211     REP        = protocol(mangos.ProtoRep)
212     SURVEYOR   = protocol(mangos.ProtoSurveyor)
213     RESPONDENT = protocol(mangos.ProtoRespondent)
214     BUS        = protocol(mangos.ProtoBus)
215     PAIR       = protocol(mangos.ProtoPair)
216 )
217
d23f54 218 func protoAgent(m Mode) protocol {
9d4b12 219     switch m {
Z 220     case PushPull:
221         return PUSH
222     case PubSub:
223         return PUB
224     case Pair:
225         return PAIR
d23f54 226     case SurvResp:
Z 227         return SURVEYOR
228     case ReqRep:
229         return REQ
230     case Bus:
231         return BUS
9d4b12 232     }
d23f54 233     return NONE
9d4b12 234 }
Z 235
d23f54 236 func protoCoactee(m Mode) protocol {
9d4b12 237     switch m {
Z 238     case PushPull:
239         return PULL
240     case PubSub:
241         return SUB
242     case Pair:
243         return PAIR
d23f54 244     case SurvResp:
Z 245         return RESPONDENT
246     case ReqRep:
247         return REP
248     case Bus:
249         return BUS
9d4b12 250     }
d23f54 251     return NONE
9d4b12 252 }