zhangmeng
2019-08-27 fa924718b1e8d5f566f7655807e77df1bcfe8f86
提交 | 用户 | age
9d4b12 1 package deliver
Z 2
3 import (
d23f54 4     "errors"
9d4b12 5     "fmt"
Z 6     "os"
7     "strings"
9e1301 8     "time"
9d4b12 9
Z 10     "nanomsg.org/go-mangos"
11     "nanomsg.org/go-mangos/protocol/bus"
12     "nanomsg.org/go-mangos/protocol/pair"
13     "nanomsg.org/go-mangos/protocol/pub"
14     "nanomsg.org/go-mangos/protocol/pull"
15     "nanomsg.org/go-mangos/protocol/push"
16     "nanomsg.org/go-mangos/protocol/rep"
17     "nanomsg.org/go-mangos/protocol/req"
18     "nanomsg.org/go-mangos/protocol/respondent"
19     "nanomsg.org/go-mangos/protocol/sub"
20     "nanomsg.org/go-mangos/protocol/surveyor"
21     "nanomsg.org/go-mangos/transport/all"
22 )
23
24 // NNG mangos wrap
25 type NNG struct {
9a89af 26     sock mangos.Socket
Z 27     typ  td
28     mode Mode
29     url  string
36766a 30
Z 31     arguments []interface{}
9d4b12 32 }
Z 33
34 // Send impl interface Diliver
35 func (n *NNG) Send(data []byte) error {
c2bbe3 36     if n == nil {
Z 37         return errors.New("please init NNG first")
38     }
36766a 39     var err error
9d4b12 40     if n.sock == nil {
d23f54 41         n.sock, err = n.makeNNG(agent)
36766a 42         if err != nil {
d23f54 43             fmt.Println("create nng sender error")
36766a 44             return err
Z 45         }
9d4b12 46     }
Z 47
c0dcd1 48     if surveyorTime > 0 {
Z 49         time.Sleep(time.Duration(surveyorTime*2) * time.Second)
50     }
51
36766a 52     msg := mangos.NewMessage(len(data))
Z 53     msg.Body = data
54     return n.sock.SendMsg(msg)
9e1301 55
9d4b12 56 }
Z 57
58 // Recv impl interface Diliver
59 func (n *NNG) Recv() ([]byte, error) {
c2bbe3 60     if n == nil {
Z 61         return nil, errors.New("please init NNG first")
62     }
63
36766a 64     var err error
Z 65
9d4b12 66     if n.sock == nil {
d23f54 67         n.sock, err = n.makeNNG(coactee)
36766a 68         if err != nil {
d23f54 69             fmt.Println("create nng reciever error")
5ff1f3 70             return nil, err
a6b23c 71         }
9d4b12 72     }
36766a 73
Z 74     var msg *mangos.Message
75     if msg, err = n.sock.RecvMsg(); err != nil {
76         return nil, err
77     }
78     return msg.Body, nil
79
9d4b12 80 }
Z 81
020e17 82 // Recv2 impl interface
fa9247 83 func (n *NNG) Recv2(data []byte) (l int, err error) {
020e17 84     data, err = n.Recv()
fa9247 85     return len(data), err
020e17 86 }
Z 87
aaae99 88 // Close impl interface Deliver
Z 89 func (n *NNG) Close() {
c2bbe3 90     if n != nil && n.sock != nil {
aaae99 91         n.sock.Close()
8e158e 92         n.sock = nil
aaae99 93     }
Z 94 }
95
6887a3 96 func nngServer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 97
Z 98     rmExistedIpcName(url)
99
36766a 100     return &NNG{
9a89af 101         typ:       agent,
36766a 102         mode:      m,
Z 103         url:       url,
104         arguments: args,
105     }
9d4b12 106 }
Z 107
6887a3 108 func nngClient(m Mode, url string, args ...interface{}) *NNG {
9d4b12 109
36766a 110     return &NNG{
9a89af 111         typ:       coactee,
36766a 112         mode:      m,
Z 113         url:       url,
114         arguments: args,
9d4b12 115     }
Z 116
36766a 117 }
Z 118
9893ed 119 func proto(typ td, m Mode) protocol {
d23f54 120     if typ == agent {
Z 121         return protoAgent(m)
122     } else if typ == coactee {
123         return protoCoactee(m)
36766a 124     }
d23f54 125     return NONE
36766a 126 }
Z 127
9893ed 128 func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
36766a 129
Z 130     var sock mangos.Socket
131     var err error
132
d23f54 133     switch n.mode {
Z 134     case Bus:
135         sock, err = n.busMakeNNG(typ)
3e6484 136     case ReqRep, SurvResp:
d23f54 137         sock, err = n.rrMakeNNG(typ)
9e1301 138     default:
d23f54 139         sock, err = n.ppMakeNNG(typ)
9d4b12 140     }
Z 141
d23f54 142     return sock, err
9d4b12 143 }
Z 144
145 func rmExistedIpcName(url string) {
146     s := strings.Split(url, "://")
147
148     if s[0] == "ipc" {
149         if _, err := os.Stat(s[1]); err == nil {
150             os.Remove(s[1])
151         }
152     }
153 }
154
155 // newSocket allocates a new Socket.  The Socket is the handle used to
156 // access the underlying library.
157 func newSocket(p protocol) (mangos.Socket, error) {
158
d23f54 159     if p == NONE {
Z 160         return nil, errors.New("new socket protocol none")
161     }
9d4b12 162     var s mangos.Socket
Z 163     var err error
164
165     switch p {
166     case PUB:
167         s, err = pub.NewSocket()
168     case SUB:
169         s, err = sub.NewSocket()
170     case PUSH:
171         s, err = push.NewSocket()
172     case PULL:
173         s, err = pull.NewSocket()
174     case REQ:
175         s, err = req.NewSocket()
176     case REP:
177         s, err = rep.NewSocket()
178     case SURVEYOR:
179         s, err = surveyor.NewSocket()
180     case RESPONDENT:
181         s, err = respondent.NewSocket()
182     case PAIR:
183         s, err = pair.NewSocket()
184     case BUS:
185         s, err = bus.NewSocket()
186     default:
187         err = mangos.ErrBadProto
188     }
189
190     if err != nil {
191         return nil, err
192     }
193
194     all.AddTransports(s)
195
196     return s, nil
197 }
198
199 func die(format string, v ...interface{}) {
200     fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
201     os.Exit(1)
202 }
203
204 // Protocol is the numeric abstraction to the various protocols or patterns
205 // that Mangos supports.
206 type protocol int
207
208 // Constants for protocols.
209 const (
d23f54 210     NONE       = -1
9d4b12 211     PUSH       = protocol(mangos.ProtoPush)
Z 212     PULL       = protocol(mangos.ProtoPull)
213     PUB        = protocol(mangos.ProtoPub)
214     SUB        = protocol(mangos.ProtoSub)
215     REQ        = protocol(mangos.ProtoReq)
216     REP        = protocol(mangos.ProtoRep)
217     SURVEYOR   = protocol(mangos.ProtoSurveyor)
218     RESPONDENT = protocol(mangos.ProtoRespondent)
219     BUS        = protocol(mangos.ProtoBus)
220     PAIR       = protocol(mangos.ProtoPair)
221 )
222
d23f54 223 func protoAgent(m Mode) protocol {
9d4b12 224     switch m {
Z 225     case PushPull:
226         return PUSH
227     case PubSub:
228         return PUB
229     case Pair:
230         return PAIR
d23f54 231     case SurvResp:
Z 232         return SURVEYOR
233     case ReqRep:
234         return REQ
235     case Bus:
236         return BUS
9d4b12 237     }
d23f54 238     return NONE
9d4b12 239 }
Z 240
d23f54 241 func protoCoactee(m Mode) protocol {
9d4b12 242     switch m {
Z 243     case PushPull:
244         return PULL
245     case PubSub:
246         return SUB
247     case Pair:
248         return PAIR
d23f54 249     case SurvResp:
Z 250         return RESPONDENT
251     case ReqRep:
252         return REP
253     case Bus:
254         return BUS
9d4b12 255     }
d23f54 256     return NONE
9d4b12 257 }