zhangmeng
2019-08-30 383557064fd77c028fd71facf8ec1cf6b278d3e6
提交 | 用户 | age
9d4b12 1 package deliver
Z 2
3 import (
d23f54 4     "errors"
9d4b12 5     "fmt"
Z 6     "os"
7     "strings"
9e1301 8     "time"
9d4b12 9
Z 10     "nanomsg.org/go-mangos"
11     "nanomsg.org/go-mangos/protocol/bus"
12     "nanomsg.org/go-mangos/protocol/pair"
13     "nanomsg.org/go-mangos/protocol/pub"
14     "nanomsg.org/go-mangos/protocol/pull"
15     "nanomsg.org/go-mangos/protocol/push"
16     "nanomsg.org/go-mangos/protocol/rep"
17     "nanomsg.org/go-mangos/protocol/req"
18     "nanomsg.org/go-mangos/protocol/respondent"
19     "nanomsg.org/go-mangos/protocol/sub"
20     "nanomsg.org/go-mangos/protocol/surveyor"
21     "nanomsg.org/go-mangos/transport/all"
22 )
23
24 // NNG mangos wrap
25 type NNG struct {
9a89af 26     sock mangos.Socket
Z 27     typ  td
28     mode Mode
29     url  string
36766a 30
383557 31     sendMsg *mangos.Message
Z 32
36766a 33     arguments []interface{}
9d4b12 34 }
Z 35
36 // Send impl interface Diliver
37 func (n *NNG) Send(data []byte) error {
c2bbe3 38     if n == nil {
Z 39         return errors.New("please init NNG first")
40     }
36766a 41     var err error
9d4b12 42     if n.sock == nil {
d23f54 43         n.sock, err = n.makeNNG(agent)
36766a 44         if err != nil {
d23f54 45             fmt.Println("create nng sender error")
36766a 46             return err
Z 47         }
9d4b12 48     }
Z 49
c0dcd1 50     if surveyorTime > 0 {
Z 51         time.Sleep(time.Duration(surveyorTime*2) * time.Second)
52     }
53
383557 54     // msg := mangos.NewMessage(len(data))
Z 55     // msg.Body = data
56     // return n.sock.SendMsg(msg)
57
58     if n.sendMsg == nil {
59         n.sendMsg = &mangos.Message{}
60         n.sendMsg.Header = make([]byte, 32)
61     }
62     n.sendMsg.Body = data
63     return n.sock.SendMsg(n.sendMsg)
9e1301 64
9d4b12 65 }
Z 66
67 // Recv impl interface Diliver
68 func (n *NNG) Recv() ([]byte, error) {
c2bbe3 69     if n == nil {
Z 70         return nil, errors.New("please init NNG first")
71     }
72
36766a 73     var err error
Z 74
9d4b12 75     if n.sock == nil {
d23f54 76         n.sock, err = n.makeNNG(coactee)
36766a 77         if err != nil {
d23f54 78             fmt.Println("create nng reciever error")
5ff1f3 79             return nil, err
a6b23c 80         }
9d4b12 81     }
36766a 82
Z 83     var msg *mangos.Message
84     if msg, err = n.sock.RecvMsg(); err != nil {
85         return nil, err
86     }
87     return msg.Body, nil
88
9d4b12 89 }
Z 90
020e17 91 // Recv2 impl interface
058b21 92 func (n *NNG) Recv2(data []byte) (l int, err error) {
020e17 93     data, err = n.Recv()
058b21 94     l = len(data)
Z 95     return l, err
020e17 96 }
Z 97
aaae99 98 // Close impl interface Deliver
Z 99 func (n *NNG) Close() {
c2bbe3 100     if n != nil && n.sock != nil {
aaae99 101         n.sock.Close()
8e158e 102         n.sock = nil
aaae99 103     }
Z 104 }
105
6887a3 106 func nngServer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 107
Z 108     rmExistedIpcName(url)
109
36766a 110     return &NNG{
9a89af 111         typ:       agent,
36766a 112         mode:      m,
Z 113         url:       url,
114         arguments: args,
115     }
9d4b12 116 }
Z 117
6887a3 118 func nngClient(m Mode, url string, args ...interface{}) *NNG {
9d4b12 119
36766a 120     return &NNG{
9a89af 121         typ:       coactee,
36766a 122         mode:      m,
Z 123         url:       url,
124         arguments: args,
9d4b12 125     }
Z 126
36766a 127 }
Z 128
9893ed 129 func proto(typ td, m Mode) protocol {
d23f54 130     if typ == agent {
Z 131         return protoAgent(m)
132     } else if typ == coactee {
133         return protoCoactee(m)
36766a 134     }
d23f54 135     return NONE
36766a 136 }
Z 137
9893ed 138 func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
36766a 139
Z 140     var sock mangos.Socket
141     var err error
142
d23f54 143     switch n.mode {
Z 144     case Bus:
145         sock, err = n.busMakeNNG(typ)
3e6484 146     case ReqRep, SurvResp:
d23f54 147         sock, err = n.rrMakeNNG(typ)
9e1301 148     default:
d23f54 149         sock, err = n.ppMakeNNG(typ)
9d4b12 150     }
Z 151
d23f54 152     return sock, err
9d4b12 153 }
Z 154
155 func rmExistedIpcName(url string) {
156     s := strings.Split(url, "://")
157
158     if s[0] == "ipc" {
159         if _, err := os.Stat(s[1]); err == nil {
160             os.Remove(s[1])
161         }
162     }
163 }
164
165 // newSocket allocates a new Socket.  The Socket is the handle used to
166 // access the underlying library.
167 func newSocket(p protocol) (mangos.Socket, error) {
168
d23f54 169     if p == NONE {
Z 170         return nil, errors.New("new socket protocol none")
171     }
9d4b12 172     var s mangos.Socket
Z 173     var err error
174
175     switch p {
176     case PUB:
177         s, err = pub.NewSocket()
178     case SUB:
179         s, err = sub.NewSocket()
180     case PUSH:
181         s, err = push.NewSocket()
182     case PULL:
183         s, err = pull.NewSocket()
184     case REQ:
185         s, err = req.NewSocket()
186     case REP:
187         s, err = rep.NewSocket()
188     case SURVEYOR:
189         s, err = surveyor.NewSocket()
190     case RESPONDENT:
191         s, err = respondent.NewSocket()
192     case PAIR:
193         s, err = pair.NewSocket()
194     case BUS:
195         s, err = bus.NewSocket()
196     default:
197         err = mangos.ErrBadProto
198     }
199
200     if err != nil {
201         return nil, err
202     }
203
204     all.AddTransports(s)
205
206     return s, nil
207 }
208
209 func die(format string, v ...interface{}) {
210     fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
211     os.Exit(1)
212 }
213
214 // Protocol is the numeric abstraction to the various protocols or patterns
215 // that Mangos supports.
216 type protocol int
217
218 // Constants for protocols.
219 const (
d23f54 220     NONE       = -1
9d4b12 221     PUSH       = protocol(mangos.ProtoPush)
Z 222     PULL       = protocol(mangos.ProtoPull)
223     PUB        = protocol(mangos.ProtoPub)
224     SUB        = protocol(mangos.ProtoSub)
225     REQ        = protocol(mangos.ProtoReq)
226     REP        = protocol(mangos.ProtoRep)
227     SURVEYOR   = protocol(mangos.ProtoSurveyor)
228     RESPONDENT = protocol(mangos.ProtoRespondent)
229     BUS        = protocol(mangos.ProtoBus)
230     PAIR       = protocol(mangos.ProtoPair)
231 )
232
d23f54 233 func protoAgent(m Mode) protocol {
9d4b12 234     switch m {
Z 235     case PushPull:
236         return PUSH
237     case PubSub:
238         return PUB
239     case Pair:
240         return PAIR
d23f54 241     case SurvResp:
Z 242         return SURVEYOR
243     case ReqRep:
244         return REQ
245     case Bus:
246         return BUS
9d4b12 247     }
d23f54 248     return NONE
9d4b12 249 }
Z 250
d23f54 251 func protoCoactee(m Mode) protocol {
9d4b12 252     switch m {
Z 253     case PushPull:
254         return PULL
255     case PubSub:
256         return SUB
257     case Pair:
258         return PAIR
d23f54 259     case SurvResp:
Z 260         return RESPONDENT
261     case ReqRep:
262         return REP
263     case Bus:
264         return BUS
9d4b12 265     }
d23f54 266     return NONE
9d4b12 267 }