zhangmeng
2019-08-27 058b21100da6f65c30ed24d56fe3d33819dd0df8
提交 | 用户 | age
9d4b12 1 package deliver
Z 2
3 import (
d23f54 4     "errors"
9d4b12 5     "fmt"
Z 6     "os"
7     "strings"
9e1301 8     "time"
9d4b12 9
Z 10     "nanomsg.org/go-mangos"
11     "nanomsg.org/go-mangos/protocol/bus"
12     "nanomsg.org/go-mangos/protocol/pair"
13     "nanomsg.org/go-mangos/protocol/pub"
14     "nanomsg.org/go-mangos/protocol/pull"
15     "nanomsg.org/go-mangos/protocol/push"
16     "nanomsg.org/go-mangos/protocol/rep"
17     "nanomsg.org/go-mangos/protocol/req"
18     "nanomsg.org/go-mangos/protocol/respondent"
19     "nanomsg.org/go-mangos/protocol/sub"
20     "nanomsg.org/go-mangos/protocol/surveyor"
21     "nanomsg.org/go-mangos/transport/all"
22 )
23
24 // NNG mangos wrap
25 type NNG struct {
9a89af 26     sock mangos.Socket
Z 27     typ  td
28     mode Mode
29     url  string
36766a 30
Z 31     arguments []interface{}
9d4b12 32 }
Z 33
34 // Send impl interface Diliver
35 func (n *NNG) Send(data []byte) error {
c2bbe3 36     if n == nil {
Z 37         return errors.New("please init NNG first")
38     }
36766a 39     var err error
9d4b12 40     if n.sock == nil {
d23f54 41         n.sock, err = n.makeNNG(agent)
36766a 42         if err != nil {
d23f54 43             fmt.Println("create nng sender error")
36766a 44             return err
Z 45         }
9d4b12 46     }
Z 47
c0dcd1 48     if surveyorTime > 0 {
Z 49         time.Sleep(time.Duration(surveyorTime*2) * time.Second)
50     }
51
36766a 52     msg := mangos.NewMessage(len(data))
Z 53     msg.Body = data
54     return n.sock.SendMsg(msg)
9e1301 55
9d4b12 56 }
Z 57
58 // Recv impl interface Diliver
59 func (n *NNG) Recv() ([]byte, error) {
c2bbe3 60     if n == nil {
Z 61         return nil, errors.New("please init NNG first")
62     }
63
36766a 64     var err error
Z 65
9d4b12 66     if n.sock == nil {
d23f54 67         n.sock, err = n.makeNNG(coactee)
36766a 68         if err != nil {
d23f54 69             fmt.Println("create nng reciever error")
5ff1f3 70             return nil, err
a6b23c 71         }
9d4b12 72     }
36766a 73
Z 74     var msg *mangos.Message
75     if msg, err = n.sock.RecvMsg(); err != nil {
76         return nil, err
77     }
78     return msg.Body, nil
79
9d4b12 80 }
Z 81
020e17 82 // Recv2 impl interface
058b21 83 func (n *NNG) Recv2(data []byte) (l int, err error) {
020e17 84     data, err = n.Recv()
058b21 85     l = len(data)
Z 86     return l, err
020e17 87 }
Z 88
aaae99 89 // Close impl interface Deliver
Z 90 func (n *NNG) Close() {
c2bbe3 91     if n != nil && n.sock != nil {
aaae99 92         n.sock.Close()
8e158e 93         n.sock = nil
aaae99 94     }
Z 95 }
96
6887a3 97 func nngServer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 98
Z 99     rmExistedIpcName(url)
100
36766a 101     return &NNG{
9a89af 102         typ:       agent,
36766a 103         mode:      m,
Z 104         url:       url,
105         arguments: args,
106     }
9d4b12 107 }
Z 108
6887a3 109 func nngClient(m Mode, url string, args ...interface{}) *NNG {
9d4b12 110
36766a 111     return &NNG{
9a89af 112         typ:       coactee,
36766a 113         mode:      m,
Z 114         url:       url,
115         arguments: args,
9d4b12 116     }
Z 117
36766a 118 }
Z 119
9893ed 120 func proto(typ td, m Mode) protocol {
d23f54 121     if typ == agent {
Z 122         return protoAgent(m)
123     } else if typ == coactee {
124         return protoCoactee(m)
36766a 125     }
d23f54 126     return NONE
36766a 127 }
Z 128
9893ed 129 func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
36766a 130
Z 131     var sock mangos.Socket
132     var err error
133
d23f54 134     switch n.mode {
Z 135     case Bus:
136         sock, err = n.busMakeNNG(typ)
3e6484 137     case ReqRep, SurvResp:
d23f54 138         sock, err = n.rrMakeNNG(typ)
9e1301 139     default:
d23f54 140         sock, err = n.ppMakeNNG(typ)
9d4b12 141     }
Z 142
d23f54 143     return sock, err
9d4b12 144 }
Z 145
146 func rmExistedIpcName(url string) {
147     s := strings.Split(url, "://")
148
149     if s[0] == "ipc" {
150         if _, err := os.Stat(s[1]); err == nil {
151             os.Remove(s[1])
152         }
153     }
154 }
155
156 // newSocket allocates a new Socket.  The Socket is the handle used to
157 // access the underlying library.
158 func newSocket(p protocol) (mangos.Socket, error) {
159
d23f54 160     if p == NONE {
Z 161         return nil, errors.New("new socket protocol none")
162     }
9d4b12 163     var s mangos.Socket
Z 164     var err error
165
166     switch p {
167     case PUB:
168         s, err = pub.NewSocket()
169     case SUB:
170         s, err = sub.NewSocket()
171     case PUSH:
172         s, err = push.NewSocket()
173     case PULL:
174         s, err = pull.NewSocket()
175     case REQ:
176         s, err = req.NewSocket()
177     case REP:
178         s, err = rep.NewSocket()
179     case SURVEYOR:
180         s, err = surveyor.NewSocket()
181     case RESPONDENT:
182         s, err = respondent.NewSocket()
183     case PAIR:
184         s, err = pair.NewSocket()
185     case BUS:
186         s, err = bus.NewSocket()
187     default:
188         err = mangos.ErrBadProto
189     }
190
191     if err != nil {
192         return nil, err
193     }
194
195     all.AddTransports(s)
196
197     return s, nil
198 }
199
200 func die(format string, v ...interface{}) {
201     fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
202     os.Exit(1)
203 }
204
205 // Protocol is the numeric abstraction to the various protocols or patterns
206 // that Mangos supports.
207 type protocol int
208
209 // Constants for protocols.
210 const (
d23f54 211     NONE       = -1
9d4b12 212     PUSH       = protocol(mangos.ProtoPush)
Z 213     PULL       = protocol(mangos.ProtoPull)
214     PUB        = protocol(mangos.ProtoPub)
215     SUB        = protocol(mangos.ProtoSub)
216     REQ        = protocol(mangos.ProtoReq)
217     REP        = protocol(mangos.ProtoRep)
218     SURVEYOR   = protocol(mangos.ProtoSurveyor)
219     RESPONDENT = protocol(mangos.ProtoRespondent)
220     BUS        = protocol(mangos.ProtoBus)
221     PAIR       = protocol(mangos.ProtoPair)
222 )
223
d23f54 224 func protoAgent(m Mode) protocol {
9d4b12 225     switch m {
Z 226     case PushPull:
227         return PUSH
228     case PubSub:
229         return PUB
230     case Pair:
231         return PAIR
d23f54 232     case SurvResp:
Z 233         return SURVEYOR
234     case ReqRep:
235         return REQ
236     case Bus:
237         return BUS
9d4b12 238     }
d23f54 239     return NONE
9d4b12 240 }
Z 241
d23f54 242 func protoCoactee(m Mode) protocol {
9d4b12 243     switch m {
Z 244     case PushPull:
245         return PULL
246     case PubSub:
247         return SUB
248     case Pair:
249         return PAIR
d23f54 250     case SurvResp:
Z 251         return RESPONDENT
252     case ReqRep:
253         return REP
254     case Bus:
255         return BUS
9d4b12 256     }
d23f54 257     return NONE
9d4b12 258 }