zhangmeng
2019-09-27 2d390df9ede39c9d7c34bd8190b9329cfc371325
提交 | 用户 | age
9d4b12 1 package deliver
Z 2
3 import (
d23f54 4     "errors"
9d4b12 5     "fmt"
Z 6     "os"
7     "strings"
9e1301 8     "time"
9d4b12 9
Z 10     "nanomsg.org/go-mangos"
11     "nanomsg.org/go-mangos/protocol/bus"
12     "nanomsg.org/go-mangos/protocol/pair"
13     "nanomsg.org/go-mangos/protocol/pub"
14     "nanomsg.org/go-mangos/protocol/pull"
15     "nanomsg.org/go-mangos/protocol/push"
16     "nanomsg.org/go-mangos/protocol/rep"
17     "nanomsg.org/go-mangos/protocol/req"
18     "nanomsg.org/go-mangos/protocol/respondent"
19     "nanomsg.org/go-mangos/protocol/sub"
20     "nanomsg.org/go-mangos/protocol/surveyor"
21     "nanomsg.org/go-mangos/transport/all"
22 )
23
24 // NNG mangos wrap
25 type NNG struct {
9a89af 26     sock mangos.Socket
Z 27     typ  td
28     mode Mode
29     url  string
36766a 30
Z 31     arguments []interface{}
9d4b12 32 }
Z 33
34 // Send impl interface Diliver
35 func (n *NNG) Send(data []byte) error {
c2bbe3 36     if n == nil {
Z 37         return errors.New("please init NNG first")
38     }
36766a 39     var err error
9d4b12 40     if n.sock == nil {
d23f54 41         n.sock, err = n.makeNNG(agent)
36766a 42         if err != nil {
d23f54 43             fmt.Println("create nng sender error")
36766a 44             return err
Z 45         }
9d4b12 46     }
Z 47
c0dcd1 48     if surveyorTime > 0 {
Z 49         time.Sleep(time.Duration(surveyorTime*2) * time.Second)
50     }
51
971bd1 52     msg := mangos.NewMessage(1)
Z 53     msg.Body = data
54     return n.sock.SendMsg(msg)
9d4b12 55 }
Z 56
57 // Recv impl interface Diliver
58 func (n *NNG) Recv() ([]byte, error) {
c2bbe3 59     if n == nil {
Z 60         return nil, errors.New("please init NNG first")
61     }
62
36766a 63     var err error
Z 64
9d4b12 65     if n.sock == nil {
d23f54 66         n.sock, err = n.makeNNG(coactee)
36766a 67         if err != nil {
d23f54 68             fmt.Println("create nng reciever error")
5ff1f3 69             return nil, err
a6b23c 70         }
9d4b12 71     }
36766a 72
Z 73     var msg *mangos.Message
74     if msg, err = n.sock.RecvMsg(); err != nil {
75         return nil, err
76     }
77     return msg.Body, nil
78
9d4b12 79 }
Z 80
020e17 81 // Recv2 impl interface
058b21 82 func (n *NNG) Recv2(data []byte) (l int, err error) {
020e17 83     data, err = n.Recv()
058b21 84     l = len(data)
Z 85     return l, err
020e17 86 }
Z 87
aaae99 88 // Close impl interface Deliver
Z 89 func (n *NNG) Close() {
c2bbe3 90     if n != nil && n.sock != nil {
aaae99 91         n.sock.Close()
8e158e 92         n.sock = nil
aaae99 93     }
Z 94 }
95
6887a3 96 func nngServer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 97
Z 98     rmExistedIpcName(url)
99
36766a 100     return &NNG{
9a89af 101         typ:       agent,
36766a 102         mode:      m,
Z 103         url:       url,
104         arguments: args,
105     }
9d4b12 106 }
Z 107
6887a3 108 func nngClient(m Mode, url string, args ...interface{}) *NNG {
9d4b12 109
36766a 110     return &NNG{
9a89af 111         typ:       coactee,
36766a 112         mode:      m,
Z 113         url:       url,
114         arguments: args,
9d4b12 115     }
Z 116
36766a 117 }
Z 118
9893ed 119 func proto(typ td, m Mode) protocol {
d23f54 120     if typ == agent {
Z 121         return protoAgent(m)
122     } else if typ == coactee {
123         return protoCoactee(m)
36766a 124     }
d23f54 125     return NONE
36766a 126 }
Z 127
9893ed 128 func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
36766a 129
Z 130     var sock mangos.Socket
131     var err error
132
d23f54 133     switch n.mode {
Z 134     case Bus:
135         sock, err = n.busMakeNNG(typ)
3e6484 136     case ReqRep, SurvResp:
d23f54 137         sock, err = n.rrMakeNNG(typ)
9e1301 138     default:
d23f54 139         sock, err = n.ppMakeNNG(typ)
9d4b12 140     }
Z 141
d23f54 142     return sock, err
9d4b12 143 }
Z 144
145 func rmExistedIpcName(url string) {
146     s := strings.Split(url, "://")
147
148     if s[0] == "ipc" {
149         if _, err := os.Stat(s[1]); err == nil {
150             os.Remove(s[1])
2d390d 151         } else if !os.IsNotExist(err) {
Z 152             os.Remove(s[1])
9d4b12 153         }
Z 154     }
155 }
156
157 // newSocket allocates a new Socket.  The Socket is the handle used to
158 // access the underlying library.
159 func newSocket(p protocol) (mangos.Socket, error) {
160
d23f54 161     if p == NONE {
Z 162         return nil, errors.New("new socket protocol none")
163     }
9d4b12 164     var s mangos.Socket
Z 165     var err error
166
167     switch p {
168     case PUB:
169         s, err = pub.NewSocket()
170     case SUB:
171         s, err = sub.NewSocket()
172     case PUSH:
173         s, err = push.NewSocket()
174     case PULL:
175         s, err = pull.NewSocket()
176     case REQ:
177         s, err = req.NewSocket()
178     case REP:
179         s, err = rep.NewSocket()
180     case SURVEYOR:
181         s, err = surveyor.NewSocket()
182     case RESPONDENT:
183         s, err = respondent.NewSocket()
184     case PAIR:
185         s, err = pair.NewSocket()
186     case BUS:
187         s, err = bus.NewSocket()
188     default:
189         err = mangos.ErrBadProto
190     }
191
192     if err != nil {
193         return nil, err
194     }
195
196     all.AddTransports(s)
197
198     return s, nil
199 }
200
201 func die(format string, v ...interface{}) {
202     fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
203     os.Exit(1)
204 }
205
206 // Protocol is the numeric abstraction to the various protocols or patterns
207 // that Mangos supports.
208 type protocol int
209
210 // Constants for protocols.
211 const (
d23f54 212     NONE       = -1
9d4b12 213     PUSH       = protocol(mangos.ProtoPush)
Z 214     PULL       = protocol(mangos.ProtoPull)
215     PUB        = protocol(mangos.ProtoPub)
216     SUB        = protocol(mangos.ProtoSub)
217     REQ        = protocol(mangos.ProtoReq)
218     REP        = protocol(mangos.ProtoRep)
219     SURVEYOR   = protocol(mangos.ProtoSurveyor)
220     RESPONDENT = protocol(mangos.ProtoRespondent)
221     BUS        = protocol(mangos.ProtoBus)
222     PAIR       = protocol(mangos.ProtoPair)
223 )
224
d23f54 225 func protoAgent(m Mode) protocol {
9d4b12 226     switch m {
Z 227     case PushPull:
228         return PUSH
229     case PubSub:
230         return PUB
231     case Pair:
232         return PAIR
d23f54 233     case SurvResp:
Z 234         return SURVEYOR
235     case ReqRep:
236         return REQ
237     case Bus:
238         return BUS
9d4b12 239     }
d23f54 240     return NONE
9d4b12 241 }
Z 242
d23f54 243 func protoCoactee(m Mode) protocol {
9d4b12 244     switch m {
Z 245     case PushPull:
246         return PULL
247     case PubSub:
248         return SUB
249     case Pair:
250         return PAIR
d23f54 251     case SurvResp:
Z 252         return RESPONDENT
253     case ReqRep:
254         return REP
255     case Bus:
256         return BUS
9d4b12 257     }
d23f54 258     return NONE
9d4b12 259 }