zhangmeng
2019-05-22 c2bbe31e7c6c9f83f7bcce26dea98bc18ed8f39c
提交 | 用户 | age
9d4b12 1 package deliver
Z 2
3 import (
d23f54 4     "errors"
9d4b12 5     "fmt"
Z 6     "os"
7     "strings"
9e1301 8     "time"
9d4b12 9
Z 10     "nanomsg.org/go-mangos"
11     "nanomsg.org/go-mangos/protocol/bus"
12     "nanomsg.org/go-mangos/protocol/pair"
13     "nanomsg.org/go-mangos/protocol/pub"
14     "nanomsg.org/go-mangos/protocol/pull"
15     "nanomsg.org/go-mangos/protocol/push"
16     "nanomsg.org/go-mangos/protocol/rep"
17     "nanomsg.org/go-mangos/protocol/req"
18     "nanomsg.org/go-mangos/protocol/respondent"
19     "nanomsg.org/go-mangos/protocol/sub"
20     "nanomsg.org/go-mangos/protocol/surveyor"
21     "nanomsg.org/go-mangos/transport/all"
22 )
23
24 // NNG mangos wrap
25 type NNG struct {
9a89af 26     sock mangos.Socket
Z 27     typ  td
28     mode Mode
29     url  string
36766a 30
Z 31     arguments []interface{}
9d4b12 32 }
Z 33
34 // Send impl interface Diliver
35 func (n *NNG) Send(data []byte) error {
c2bbe3 36     if n == nil {
Z 37         return errors.New("please init NNG first")
38     }
36766a 39     var err error
9d4b12 40     if n.sock == nil {
d23f54 41         n.sock, err = n.makeNNG(agent)
36766a 42         if err != nil {
d23f54 43             fmt.Println("create nng sender error")
36766a 44             return err
Z 45         }
9d4b12 46     }
Z 47
c0dcd1 48     if surveyorTime > 0 {
Z 49         time.Sleep(time.Duration(surveyorTime*2) * time.Second)
50     }
51
36766a 52     msg := mangos.NewMessage(len(data))
Z 53     msg.Body = data
54     return n.sock.SendMsg(msg)
9e1301 55
9d4b12 56 }
Z 57
58 // Recv impl interface Diliver
59 func (n *NNG) Recv() ([]byte, error) {
c2bbe3 60     if n == nil {
Z 61         return nil, errors.New("please init NNG first")
62     }
63
36766a 64     var err error
Z 65
9d4b12 66     if n.sock == nil {
d23f54 67         n.sock, err = n.makeNNG(coactee)
36766a 68         if err != nil {
d23f54 69             fmt.Println("create nng reciever error")
5ff1f3 70             return nil, err
a6b23c 71         }
9d4b12 72     }
36766a 73
Z 74     var msg *mangos.Message
75     if msg, err = n.sock.RecvMsg(); err != nil {
76         return nil, err
77     }
78     return msg.Body, nil
79
9d4b12 80 }
Z 81
aaae99 82 // Close impl interface Deliver
Z 83 func (n *NNG) Close() {
c2bbe3 84     if n != nil && n.sock != nil {
aaae99 85         n.sock.Close()
8e158e 86         n.sock = nil
aaae99 87     }
Z 88 }
89
6887a3 90 func nngServer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 91
Z 92     rmExistedIpcName(url)
93
36766a 94     return &NNG{
9a89af 95         typ:       agent,
36766a 96         mode:      m,
Z 97         url:       url,
98         arguments: args,
99     }
9d4b12 100 }
Z 101
6887a3 102 func nngClient(m Mode, url string, args ...interface{}) *NNG {
9d4b12 103
36766a 104     return &NNG{
9a89af 105         typ:       coactee,
36766a 106         mode:      m,
Z 107         url:       url,
108         arguments: args,
9d4b12 109     }
Z 110
36766a 111 }
Z 112
9893ed 113 func proto(typ td, m Mode) protocol {
d23f54 114     if typ == agent {
Z 115         return protoAgent(m)
116     } else if typ == coactee {
117         return protoCoactee(m)
36766a 118     }
d23f54 119     return NONE
36766a 120 }
Z 121
9893ed 122 func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
36766a 123
Z 124     var sock mangos.Socket
125     var err error
126
d23f54 127     switch n.mode {
Z 128     case Bus:
129         sock, err = n.busMakeNNG(typ)
130     case ReqRep:
131         sock, err = n.rrMakeNNG(typ)
9e1301 132     default:
d23f54 133         sock, err = n.ppMakeNNG(typ)
9d4b12 134     }
Z 135
d23f54 136     return sock, err
9d4b12 137 }
Z 138
139 func rmExistedIpcName(url string) {
140     s := strings.Split(url, "://")
141
142     if s[0] == "ipc" {
143         if _, err := os.Stat(s[1]); err == nil {
144             os.Remove(s[1])
145         }
146     }
147 }
148
149 // newSocket allocates a new Socket.  The Socket is the handle used to
150 // access the underlying library.
151 func newSocket(p protocol) (mangos.Socket, error) {
152
d23f54 153     if p == NONE {
Z 154         return nil, errors.New("new socket protocol none")
155     }
9d4b12 156     var s mangos.Socket
Z 157     var err error
158
159     switch p {
160     case PUB:
161         s, err = pub.NewSocket()
162     case SUB:
163         s, err = sub.NewSocket()
164     case PUSH:
165         s, err = push.NewSocket()
166     case PULL:
167         s, err = pull.NewSocket()
168     case REQ:
169         s, err = req.NewSocket()
170     case REP:
171         s, err = rep.NewSocket()
172     case SURVEYOR:
173         s, err = surveyor.NewSocket()
174     case RESPONDENT:
175         s, err = respondent.NewSocket()
176     case PAIR:
177         s, err = pair.NewSocket()
178     case BUS:
179         s, err = bus.NewSocket()
180     default:
181         err = mangos.ErrBadProto
182     }
183
184     if err != nil {
185         return nil, err
186     }
187
188     all.AddTransports(s)
189
190     return s, nil
191 }
192
193 func die(format string, v ...interface{}) {
194     fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
195     os.Exit(1)
196 }
197
198 // Protocol is the numeric abstraction to the various protocols or patterns
199 // that Mangos supports.
200 type protocol int
201
202 // Constants for protocols.
203 const (
d23f54 204     NONE       = -1
9d4b12 205     PUSH       = protocol(mangos.ProtoPush)
Z 206     PULL       = protocol(mangos.ProtoPull)
207     PUB        = protocol(mangos.ProtoPub)
208     SUB        = protocol(mangos.ProtoSub)
209     REQ        = protocol(mangos.ProtoReq)
210     REP        = protocol(mangos.ProtoRep)
211     SURVEYOR   = protocol(mangos.ProtoSurveyor)
212     RESPONDENT = protocol(mangos.ProtoRespondent)
213     BUS        = protocol(mangos.ProtoBus)
214     PAIR       = protocol(mangos.ProtoPair)
215 )
216
d23f54 217 func protoAgent(m Mode) protocol {
9d4b12 218     switch m {
Z 219     case PushPull:
220         return PUSH
221     case PubSub:
222         return PUB
223     case Pair:
224         return PAIR
d23f54 225     case SurvResp:
Z 226         return SURVEYOR
227     case ReqRep:
228         return REQ
229     case Bus:
230         return BUS
9d4b12 231     }
d23f54 232     return NONE
9d4b12 233 }
Z 234
d23f54 235 func protoCoactee(m Mode) protocol {
9d4b12 236     switch m {
Z 237     case PushPull:
238         return PULL
239     case PubSub:
240         return SUB
241     case Pair:
242         return PAIR
d23f54 243     case SurvResp:
Z 244         return RESPONDENT
245     case ReqRep:
246         return REP
247     case Bus:
248         return BUS
9d4b12 249     }
d23f54 250     return NONE
9d4b12 251 }