zhangmeng
2019-05-20 9a89af693b9336633bcac2a652c294f782e6b3b1
提交 | 用户 | age
9d4b12 1 package deliver
Z 2
3 import (
d23f54 4     "errors"
9d4b12 5     "fmt"
Z 6     "os"
7     "strings"
9e1301 8     "time"
9d4b12 9
Z 10     "nanomsg.org/go-mangos"
11     "nanomsg.org/go-mangos/protocol/bus"
12     "nanomsg.org/go-mangos/protocol/pair"
13     "nanomsg.org/go-mangos/protocol/pub"
14     "nanomsg.org/go-mangos/protocol/pull"
15     "nanomsg.org/go-mangos/protocol/push"
16     "nanomsg.org/go-mangos/protocol/rep"
17     "nanomsg.org/go-mangos/protocol/req"
18     "nanomsg.org/go-mangos/protocol/respondent"
19     "nanomsg.org/go-mangos/protocol/sub"
20     "nanomsg.org/go-mangos/protocol/surveyor"
21     "nanomsg.org/go-mangos/transport/all"
22 )
23
24 // NNG mangos wrap
25 type NNG struct {
9a89af 26     sock mangos.Socket
Z 27     typ  td
28     mode Mode
29     url  string
36766a 30
Z 31     arguments []interface{}
9d4b12 32 }
Z 33
34 // Send impl interface Diliver
35 func (n *NNG) Send(data []byte) error {
36766a 36     var err error
9d4b12 37     if n.sock == nil {
d23f54 38         n.sock, err = n.makeNNG(agent)
36766a 39         if err != nil {
d23f54 40             fmt.Println("create nng sender error")
36766a 41             return err
Z 42         }
9d4b12 43     }
Z 44
c0dcd1 45     if surveyorTime > 0 {
Z 46         time.Sleep(time.Duration(surveyorTime*2) * time.Second)
47     }
48
36766a 49     msg := mangos.NewMessage(len(data))
Z 50     msg.Body = data
51     return n.sock.SendMsg(msg)
9e1301 52
9d4b12 53 }
Z 54
55 // Recv impl interface Diliver
56 func (n *NNG) Recv() ([]byte, error) {
36766a 57     var err error
Z 58
9d4b12 59     if n.sock == nil {
d23f54 60         n.sock, err = n.makeNNG(coactee)
36766a 61         if err != nil {
d23f54 62             fmt.Println("create nng reciever error")
5ff1f3 63             return nil, err
a6b23c 64         }
9d4b12 65     }
36766a 66
Z 67     var msg *mangos.Message
68     if msg, err = n.sock.RecvMsg(); err != nil {
69         return nil, err
70     }
71     return msg.Body, nil
72
9d4b12 73 }
Z 74
aaae99 75 // Close impl interface Deliver
Z 76 func (n *NNG) Close() {
77     if n.sock != nil {
78         n.sock.Close()
8e158e 79         n.sock = nil
aaae99 80     }
Z 81 }
82
6887a3 83 func nngServer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 84
Z 85     rmExistedIpcName(url)
86
36766a 87     return &NNG{
9a89af 88         typ:       agent,
36766a 89         mode:      m,
Z 90         url:       url,
91         arguments: args,
92     }
9d4b12 93 }
Z 94
6887a3 95 func nngClient(m Mode, url string, args ...interface{}) *NNG {
9d4b12 96
36766a 97     return &NNG{
9a89af 98         typ:       coactee,
36766a 99         mode:      m,
Z 100         url:       url,
101         arguments: args,
9d4b12 102     }
Z 103
36766a 104 }
Z 105
9893ed 106 func proto(typ td, m Mode) protocol {
d23f54 107     if typ == agent {
Z 108         return protoAgent(m)
109     } else if typ == coactee {
110         return protoCoactee(m)
36766a 111     }
d23f54 112     return NONE
36766a 113 }
Z 114
9893ed 115 func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
36766a 116
Z 117     var sock mangos.Socket
118     var err error
119
d23f54 120     switch n.mode {
Z 121     case Bus:
122         sock, err = n.busMakeNNG(typ)
123     case ReqRep:
124         sock, err = n.rrMakeNNG(typ)
9e1301 125     default:
d23f54 126         sock, err = n.ppMakeNNG(typ)
9d4b12 127     }
Z 128
d23f54 129     return sock, err
9d4b12 130 }
Z 131
132 func rmExistedIpcName(url string) {
133     s := strings.Split(url, "://")
134
135     if s[0] == "ipc" {
136         if _, err := os.Stat(s[1]); err == nil {
137             os.Remove(s[1])
138         }
139     }
140 }
141
142 // newSocket allocates a new Socket.  The Socket is the handle used to
143 // access the underlying library.
144 func newSocket(p protocol) (mangos.Socket, error) {
145
d23f54 146     if p == NONE {
Z 147         return nil, errors.New("new socket protocol none")
148     }
9d4b12 149     var s mangos.Socket
Z 150     var err error
151
152     switch p {
153     case PUB:
154         s, err = pub.NewSocket()
155     case SUB:
156         s, err = sub.NewSocket()
157     case PUSH:
158         s, err = push.NewSocket()
159     case PULL:
160         s, err = pull.NewSocket()
161     case REQ:
162         s, err = req.NewSocket()
163     case REP:
164         s, err = rep.NewSocket()
165     case SURVEYOR:
166         s, err = surveyor.NewSocket()
167     case RESPONDENT:
168         s, err = respondent.NewSocket()
169     case PAIR:
170         s, err = pair.NewSocket()
171     case BUS:
172         s, err = bus.NewSocket()
173     default:
174         err = mangos.ErrBadProto
175     }
176
177     if err != nil {
178         return nil, err
179     }
180
181     all.AddTransports(s)
182
183     return s, nil
184 }
185
186 func die(format string, v ...interface{}) {
187     fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
188     os.Exit(1)
189 }
190
191 // Protocol is the numeric abstraction to the various protocols or patterns
192 // that Mangos supports.
193 type protocol int
194
195 // Constants for protocols.
196 const (
d23f54 197     NONE       = -1
9d4b12 198     PUSH       = protocol(mangos.ProtoPush)
Z 199     PULL       = protocol(mangos.ProtoPull)
200     PUB        = protocol(mangos.ProtoPub)
201     SUB        = protocol(mangos.ProtoSub)
202     REQ        = protocol(mangos.ProtoReq)
203     REP        = protocol(mangos.ProtoRep)
204     SURVEYOR   = protocol(mangos.ProtoSurveyor)
205     RESPONDENT = protocol(mangos.ProtoRespondent)
206     BUS        = protocol(mangos.ProtoBus)
207     PAIR       = protocol(mangos.ProtoPair)
208 )
209
d23f54 210 func protoAgent(m Mode) protocol {
9d4b12 211     switch m {
Z 212     case PushPull:
213         return PUSH
214     case PubSub:
215         return PUB
216     case Pair:
217         return PAIR
d23f54 218     case SurvResp:
Z 219         return SURVEYOR
220     case ReqRep:
221         return REQ
222     case Bus:
223         return BUS
9d4b12 224     }
d23f54 225     return NONE
9d4b12 226 }
Z 227
d23f54 228 func protoCoactee(m Mode) protocol {
9d4b12 229     switch m {
Z 230     case PushPull:
231         return PULL
232     case PubSub:
233         return SUB
234     case Pair:
235         return PAIR
d23f54 236     case SurvResp:
Z 237         return RESPONDENT
238     case ReqRep:
239         return REP
240     case Bus:
241         return BUS
9d4b12 242     }
d23f54 243     return NONE
9d4b12 244 }