zhangmeng
2019-05-17 d23f54e337d12fb4e6d5a0a5e1f041a51005e10c
提交 | 用户 | age
9d4b12 1 package deliver
Z 2
3 import (
d23f54 4     "errors"
9d4b12 5     "fmt"
Z 6     "os"
7     "strings"
9e1301 8     "time"
9d4b12 9
Z 10     "nanomsg.org/go-mangos"
11     "nanomsg.org/go-mangos/protocol/bus"
12     "nanomsg.org/go-mangos/protocol/pair"
13     "nanomsg.org/go-mangos/protocol/pub"
14     "nanomsg.org/go-mangos/protocol/pull"
15     "nanomsg.org/go-mangos/protocol/push"
16     "nanomsg.org/go-mangos/protocol/rep"
17     "nanomsg.org/go-mangos/protocol/req"
18     "nanomsg.org/go-mangos/protocol/respondent"
19     "nanomsg.org/go-mangos/protocol/sub"
20     "nanomsg.org/go-mangos/protocol/surveyor"
21     "nanomsg.org/go-mangos/transport/all"
22 )
23
9893ed 24 // type deliver
Z 25 type td int
26
27 const (
d23f54 28     agent = td(iota)
Z 29     coactee
9893ed 30 )
Z 31
9d4b12 32 // NNG mangos wrap
Z 33 type NNG struct {
36766a 34     sock   mangos.Socket
Z 35     server bool
36     mode   Mode
37     url    string
38
39     arguments []interface{}
9d4b12 40 }
Z 41
42 // Send impl interface Diliver
43 func (n *NNG) Send(data []byte) error {
36766a 44     var err error
9d4b12 45     if n.sock == nil {
d23f54 46         n.sock, err = n.makeNNG(agent)
36766a 47         if err != nil {
d23f54 48             fmt.Println("create nng sender error")
36766a 49             return err
Z 50         }
9d4b12 51     }
Z 52
c0dcd1 53     if surveyorTime > 0 {
Z 54         time.Sleep(time.Duration(surveyorTime*2) * time.Second)
55     }
56
36766a 57     msg := mangos.NewMessage(len(data))
Z 58     msg.Body = data
59     return n.sock.SendMsg(msg)
9e1301 60
9d4b12 61 }
Z 62
63 // Recv impl interface Diliver
64 func (n *NNG) Recv() ([]byte, error) {
36766a 65     var err error
Z 66
9d4b12 67     if n.sock == nil {
d23f54 68         n.sock, err = n.makeNNG(coactee)
36766a 69         if err != nil {
d23f54 70             fmt.Println("create nng reciever error")
5ff1f3 71             return nil, err
a6b23c 72         }
9d4b12 73     }
36766a 74
Z 75     var msg *mangos.Message
76     if msg, err = n.sock.RecvMsg(); err != nil {
77         return nil, err
78     }
79     return msg.Body, nil
80
9d4b12 81 }
Z 82
aaae99 83 // Close impl interface Deliver
Z 84 func (n *NNG) Close() {
85     if n.sock != nil {
86         n.sock.Close()
8e158e 87         n.sock = nil
aaae99 88     }
Z 89 }
90
6887a3 91 func nngServer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 92
Z 93     rmExistedIpcName(url)
94
36766a 95     return &NNG{
Z 96         server:    true,
97         mode:      m,
98         url:       url,
99         arguments: args,
100     }
9d4b12 101 }
Z 102
6887a3 103 func nngClient(m Mode, url string, args ...interface{}) *NNG {
9d4b12 104
36766a 105     return &NNG{
Z 106         server:    false,
107         mode:      m,
108         url:       url,
109         arguments: args,
9d4b12 110     }
Z 111
36766a 112 }
Z 113
9893ed 114 func proto(typ td, m Mode) protocol {
d23f54 115     if typ == agent {
Z 116         return protoAgent(m)
117     } else if typ == coactee {
118         return protoCoactee(m)
36766a 119     }
d23f54 120     return NONE
36766a 121 }
Z 122
9893ed 123 func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
36766a 124
Z 125     var sock mangos.Socket
126     var err error
127
d23f54 128     switch n.mode {
Z 129     case Bus:
130         sock, err = n.busMakeNNG(typ)
131     case ReqRep:
132         sock, err = n.rrMakeNNG(typ)
9e1301 133     default:
d23f54 134         sock, err = n.ppMakeNNG(typ)
9d4b12 135     }
Z 136
d23f54 137     return sock, err
9d4b12 138 }
Z 139
140 func rmExistedIpcName(url string) {
141     s := strings.Split(url, "://")
142
143     if s[0] == "ipc" {
144         if _, err := os.Stat(s[1]); err == nil {
145             os.Remove(s[1])
146         }
147     }
148 }
149
150 // newSocket allocates a new Socket.  The Socket is the handle used to
151 // access the underlying library.
152 func newSocket(p protocol) (mangos.Socket, error) {
153
d23f54 154     if p == NONE {
Z 155         return nil, errors.New("new socket protocol none")
156     }
9d4b12 157     var s mangos.Socket
Z 158     var err error
159
160     switch p {
161     case PUB:
162         s, err = pub.NewSocket()
163     case SUB:
164         s, err = sub.NewSocket()
165     case PUSH:
166         s, err = push.NewSocket()
167     case PULL:
168         s, err = pull.NewSocket()
169     case REQ:
170         s, err = req.NewSocket()
171     case REP:
172         s, err = rep.NewSocket()
173     case SURVEYOR:
174         s, err = surveyor.NewSocket()
175     case RESPONDENT:
176         s, err = respondent.NewSocket()
177     case PAIR:
178         s, err = pair.NewSocket()
179     case BUS:
180         s, err = bus.NewSocket()
181     default:
182         err = mangos.ErrBadProto
183     }
184
185     if err != nil {
186         return nil, err
187     }
188
189     all.AddTransports(s)
190
191     return s, nil
192 }
193
194 func die(format string, v ...interface{}) {
195     fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
196     os.Exit(1)
197 }
198
199 // Protocol is the numeric abstraction to the various protocols or patterns
200 // that Mangos supports.
201 type protocol int
202
203 // Constants for protocols.
204 const (
d23f54 205     NONE       = -1
9d4b12 206     PUSH       = protocol(mangos.ProtoPush)
Z 207     PULL       = protocol(mangos.ProtoPull)
208     PUB        = protocol(mangos.ProtoPub)
209     SUB        = protocol(mangos.ProtoSub)
210     REQ        = protocol(mangos.ProtoReq)
211     REP        = protocol(mangos.ProtoRep)
212     SURVEYOR   = protocol(mangos.ProtoSurveyor)
213     RESPONDENT = protocol(mangos.ProtoRespondent)
214     BUS        = protocol(mangos.ProtoBus)
215     PAIR       = protocol(mangos.ProtoPair)
216 )
217
d23f54 218 func protoAgent(m Mode) protocol {
9d4b12 219     switch m {
Z 220     case PushPull:
221         return PUSH
222     case PubSub:
223         return PUB
224     case Pair:
225         return PAIR
d23f54 226     case SurvResp:
Z 227         return SURVEYOR
228     case ReqRep:
229         return REQ
230     case Bus:
231         return BUS
9d4b12 232     }
d23f54 233     return NONE
9d4b12 234 }
Z 235
d23f54 236 func protoCoactee(m Mode) protocol {
9d4b12 237     switch m {
Z 238     case PushPull:
239         return PULL
240     case PubSub:
241         return SUB
242     case Pair:
243         return PAIR
d23f54 244     case SurvResp:
Z 245         return RESPONDENT
246     case ReqRep:
247         return REP
248     case Bus:
249         return BUS
9d4b12 250     }
d23f54 251     return NONE
9d4b12 252 }