zhangmeng
2019-08-30 ca8a257c65842a4b140c4371e5365769c7553004
提交 | 用户 | age
9d4b12 1 package deliver
Z 2
3 import (
d23f54 4     "errors"
9d4b12 5     "fmt"
Z 6     "os"
7     "strings"
9e1301 8     "time"
9d4b12 9
Z 10     "nanomsg.org/go-mangos"
11     "nanomsg.org/go-mangos/protocol/bus"
12     "nanomsg.org/go-mangos/protocol/pair"
13     "nanomsg.org/go-mangos/protocol/pub"
14     "nanomsg.org/go-mangos/protocol/pull"
15     "nanomsg.org/go-mangos/protocol/push"
16     "nanomsg.org/go-mangos/protocol/rep"
17     "nanomsg.org/go-mangos/protocol/req"
18     "nanomsg.org/go-mangos/protocol/respondent"
19     "nanomsg.org/go-mangos/protocol/sub"
20     "nanomsg.org/go-mangos/protocol/surveyor"
21     "nanomsg.org/go-mangos/transport/all"
22 )
23
24 // NNG mangos wrap
25 type NNG struct {
9a89af 26     sock mangos.Socket
Z 27     typ  td
28     mode Mode
29     url  string
36766a 30
383557 31     sendMsg *mangos.Message
Z 32
36766a 33     arguments []interface{}
9d4b12 34 }
Z 35
36 // Send impl interface Diliver
37 func (n *NNG) Send(data []byte) error {
c2bbe3 38     if n == nil {
Z 39         return errors.New("please init NNG first")
40     }
36766a 41     var err error
9d4b12 42     if n.sock == nil {
d23f54 43         n.sock, err = n.makeNNG(agent)
36766a 44         if err != nil {
d23f54 45             fmt.Println("create nng sender error")
36766a 46             return err
Z 47         }
9d4b12 48     }
Z 49
c0dcd1 50     if surveyorTime > 0 {
Z 51         time.Sleep(time.Duration(surveyorTime*2) * time.Second)
52     }
53
383557 54     // msg := mangos.NewMessage(len(data))
Z 55     // msg.Body = data
56     // return n.sock.SendMsg(msg)
57
58     if n.sendMsg == nil {
ca8a25 59         n.sendMsg = mangos.NewMessage(1)
383557 60     }
Z 61     n.sendMsg.Body = data
62     return n.sock.SendMsg(n.sendMsg)
9e1301 63
9d4b12 64 }
Z 65
66 // Recv impl interface Diliver
67 func (n *NNG) Recv() ([]byte, error) {
c2bbe3 68     if n == nil {
Z 69         return nil, errors.New("please init NNG first")
70     }
71
36766a 72     var err error
Z 73
9d4b12 74     if n.sock == nil {
d23f54 75         n.sock, err = n.makeNNG(coactee)
36766a 76         if err != nil {
d23f54 77             fmt.Println("create nng reciever error")
5ff1f3 78             return nil, err
a6b23c 79         }
9d4b12 80     }
36766a 81
Z 82     var msg *mangos.Message
83     if msg, err = n.sock.RecvMsg(); err != nil {
84         return nil, err
85     }
86     return msg.Body, nil
87
9d4b12 88 }
Z 89
020e17 90 // Recv2 impl interface
058b21 91 func (n *NNG) Recv2(data []byte) (l int, err error) {
020e17 92     data, err = n.Recv()
058b21 93     l = len(data)
Z 94     return l, err
020e17 95 }
Z 96
aaae99 97 // Close impl interface Deliver
Z 98 func (n *NNG) Close() {
c2bbe3 99     if n != nil && n.sock != nil {
aaae99 100         n.sock.Close()
8e158e 101         n.sock = nil
aaae99 102     }
Z 103 }
104
6887a3 105 func nngServer(m Mode, url string, args ...interface{}) *NNG {
9d4b12 106
Z 107     rmExistedIpcName(url)
108
36766a 109     return &NNG{
9a89af 110         typ:       agent,
36766a 111         mode:      m,
Z 112         url:       url,
113         arguments: args,
114     }
9d4b12 115 }
Z 116
6887a3 117 func nngClient(m Mode, url string, args ...interface{}) *NNG {
9d4b12 118
36766a 119     return &NNG{
9a89af 120         typ:       coactee,
36766a 121         mode:      m,
Z 122         url:       url,
123         arguments: args,
9d4b12 124     }
Z 125
36766a 126 }
Z 127
9893ed 128 func proto(typ td, m Mode) protocol {
d23f54 129     if typ == agent {
Z 130         return protoAgent(m)
131     } else if typ == coactee {
132         return protoCoactee(m)
36766a 133     }
d23f54 134     return NONE
36766a 135 }
Z 136
9893ed 137 func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
36766a 138
Z 139     var sock mangos.Socket
140     var err error
141
d23f54 142     switch n.mode {
Z 143     case Bus:
144         sock, err = n.busMakeNNG(typ)
3e6484 145     case ReqRep, SurvResp:
d23f54 146         sock, err = n.rrMakeNNG(typ)
9e1301 147     default:
d23f54 148         sock, err = n.ppMakeNNG(typ)
9d4b12 149     }
Z 150
d23f54 151     return sock, err
9d4b12 152 }
Z 153
154 func rmExistedIpcName(url string) {
155     s := strings.Split(url, "://")
156
157     if s[0] == "ipc" {
158         if _, err := os.Stat(s[1]); err == nil {
159             os.Remove(s[1])
160         }
161     }
162 }
163
164 // newSocket allocates a new Socket.  The Socket is the handle used to
165 // access the underlying library.
166 func newSocket(p protocol) (mangos.Socket, error) {
167
d23f54 168     if p == NONE {
Z 169         return nil, errors.New("new socket protocol none")
170     }
9d4b12 171     var s mangos.Socket
Z 172     var err error
173
174     switch p {
175     case PUB:
176         s, err = pub.NewSocket()
177     case SUB:
178         s, err = sub.NewSocket()
179     case PUSH:
180         s, err = push.NewSocket()
181     case PULL:
182         s, err = pull.NewSocket()
183     case REQ:
184         s, err = req.NewSocket()
185     case REP:
186         s, err = rep.NewSocket()
187     case SURVEYOR:
188         s, err = surveyor.NewSocket()
189     case RESPONDENT:
190         s, err = respondent.NewSocket()
191     case PAIR:
192         s, err = pair.NewSocket()
193     case BUS:
194         s, err = bus.NewSocket()
195     default:
196         err = mangos.ErrBadProto
197     }
198
199     if err != nil {
200         return nil, err
201     }
202
203     all.AddTransports(s)
204
205     return s, nil
206 }
207
208 func die(format string, v ...interface{}) {
209     fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
210     os.Exit(1)
211 }
212
213 // Protocol is the numeric abstraction to the various protocols or patterns
214 // that Mangos supports.
215 type protocol int
216
217 // Constants for protocols.
218 const (
d23f54 219     NONE       = -1
9d4b12 220     PUSH       = protocol(mangos.ProtoPush)
Z 221     PULL       = protocol(mangos.ProtoPull)
222     PUB        = protocol(mangos.ProtoPub)
223     SUB        = protocol(mangos.ProtoSub)
224     REQ        = protocol(mangos.ProtoReq)
225     REP        = protocol(mangos.ProtoRep)
226     SURVEYOR   = protocol(mangos.ProtoSurveyor)
227     RESPONDENT = protocol(mangos.ProtoRespondent)
228     BUS        = protocol(mangos.ProtoBus)
229     PAIR       = protocol(mangos.ProtoPair)
230 )
231
d23f54 232 func protoAgent(m Mode) protocol {
9d4b12 233     switch m {
Z 234     case PushPull:
235         return PUSH
236     case PubSub:
237         return PUB
238     case Pair:
239         return PAIR
d23f54 240     case SurvResp:
Z 241         return SURVEYOR
242     case ReqRep:
243         return REQ
244     case Bus:
245         return BUS
9d4b12 246     }
d23f54 247     return NONE
9d4b12 248 }
Z 249
d23f54 250 func protoCoactee(m Mode) protocol {
9d4b12 251     switch m {
Z 252     case PushPull:
253         return PULL
254     case PubSub:
255         return SUB
256     case Pair:
257         return PAIR
d23f54 258     case SurvResp:
Z 259         return RESPONDENT
260     case ReqRep:
261         return REP
262     case Bus:
263         return BUS
9d4b12 264     }
d23f54 265     return NONE
9d4b12 266 }