| 提交 | 用户 | age | ||
| 9d4b12 | 1 | package deliver |
| Z | 2 | |
| 3 | import ( | |
| d23f54 | 4 | "errors" |
| 9d4b12 | 5 | "fmt" |
| Z | 6 | "os" |
| 7 | "strings" | |
| 9e1301 | 8 | "time" |
| 9d4b12 | 9 | |
| Z | 10 | "nanomsg.org/go-mangos" |
| 11 | "nanomsg.org/go-mangos/protocol/bus" | |
| 12 | "nanomsg.org/go-mangos/protocol/pair" | |
| 13 | "nanomsg.org/go-mangos/protocol/pub" | |
| 14 | "nanomsg.org/go-mangos/protocol/pull" | |
| 15 | "nanomsg.org/go-mangos/protocol/push" | |
| 16 | "nanomsg.org/go-mangos/protocol/rep" | |
| 17 | "nanomsg.org/go-mangos/protocol/req" | |
| 18 | "nanomsg.org/go-mangos/protocol/respondent" | |
| 19 | "nanomsg.org/go-mangos/protocol/sub" | |
| 20 | "nanomsg.org/go-mangos/protocol/surveyor" | |
| 21 | "nanomsg.org/go-mangos/transport/all" | |
| 22 | ) | |
| 23 | ||
| 24 | // NNG mangos wrap | |
| 25 | type NNG struct { | |
| 9a89af | 26 | sock mangos.Socket |
| Z | 27 | typ td |
| 28 | mode Mode | |
| 29 | url string | |
| 36766a | 30 | |
| Z | 31 | arguments []interface{} |
| 9d4b12 | 32 | } |
| Z | 33 | |
| 34 | // Send impl interface Diliver | |
| 35 | func (n *NNG) Send(data []byte) error { | |
| c2bbe3 | 36 | if n == nil { |
| Z | 37 | return errors.New("please init NNG first") |
| 38 | } | |
| 36766a | 39 | var err error |
| 9d4b12 | 40 | if n.sock == nil { |
| d23f54 | 41 | n.sock, err = n.makeNNG(agent) |
| 36766a | 42 | if err != nil { |
| d23f54 | 43 | fmt.Println("create nng sender error") |
| 36766a | 44 | return err |
| Z | 45 | } |
| 9d4b12 | 46 | } |
| Z | 47 | |
| c0dcd1 | 48 | if surveyorTime > 0 { |
| Z | 49 | time.Sleep(time.Duration(surveyorTime*2) * time.Second) |
| 50 | } | |
| 51 | ||
| 971bd1 | 52 | msg := mangos.NewMessage(1) |
| Z | 53 | msg.Body = data |
| 54 | return n.sock.SendMsg(msg) | |
| 9d4b12 | 55 | } |
| Z | 56 | |
| 57 | // Recv impl interface Diliver | |
| 58 | func (n *NNG) Recv() ([]byte, error) { | |
| c2bbe3 | 59 | if n == nil { |
| Z | 60 | return nil, errors.New("please init NNG first") |
| 61 | } | |
| 62 | ||
| 36766a | 63 | var err error |
| Z | 64 | |
| 9d4b12 | 65 | if n.sock == nil { |
| d23f54 | 66 | n.sock, err = n.makeNNG(coactee) |
| 36766a | 67 | if err != nil { |
| d23f54 | 68 | fmt.Println("create nng reciever error") |
| 5ff1f3 | 69 | return nil, err |
| a6b23c | 70 | } |
| 9d4b12 | 71 | } |
| 36766a | 72 | |
| Z | 73 | var msg *mangos.Message |
| 74 | if msg, err = n.sock.RecvMsg(); err != nil { | |
| 75 | return nil, err | |
| 76 | } | |
| 77 | return msg.Body, nil | |
| 78 | ||
| 9d4b12 | 79 | } |
| Z | 80 | |
| 020e17 | 81 | // Recv2 impl interface |
| 058b21 | 82 | func (n *NNG) Recv2(data []byte) (l int, err error) { |
| 020e17 | 83 | data, err = n.Recv() |
| 058b21 | 84 | l = len(data) |
| Z | 85 | return l, err |
| 020e17 | 86 | } |
| Z | 87 | |
| aaae99 | 88 | // Close impl interface Deliver |
| Z | 89 | func (n *NNG) Close() { |
| c2bbe3 | 90 | if n != nil && n.sock != nil { |
| aaae99 | 91 | n.sock.Close() |
| 8e158e | 92 | n.sock = nil |
| aaae99 | 93 | } |
| Z | 94 | } |
| 95 | ||
| 6887a3 | 96 | func nngServer(m Mode, url string, args ...interface{}) *NNG { |
| 9d4b12 | 97 | |
| Z | 98 | rmExistedIpcName(url) |
| 99 | ||
| 36766a | 100 | return &NNG{ |
| 9a89af | 101 | typ: agent, |
| 36766a | 102 | mode: m, |
| Z | 103 | url: url, |
| 104 | arguments: args, | |
| 105 | } | |
| 9d4b12 | 106 | } |
| Z | 107 | |
| 6887a3 | 108 | func nngClient(m Mode, url string, args ...interface{}) *NNG { |
| 9d4b12 | 109 | |
| 36766a | 110 | return &NNG{ |
| 9a89af | 111 | typ: coactee, |
| 36766a | 112 | mode: m, |
| Z | 113 | url: url, |
| 114 | arguments: args, | |
| 9d4b12 | 115 | } |
| Z | 116 | |
| 36766a | 117 | } |
| Z | 118 | |
| 9893ed | 119 | func proto(typ td, m Mode) protocol { |
| d23f54 | 120 | if typ == agent { |
| Z | 121 | return protoAgent(m) |
| 122 | } else if typ == coactee { | |
| 123 | return protoCoactee(m) | |
| 36766a | 124 | } |
| d23f54 | 125 | return NONE |
| 36766a | 126 | } |
| Z | 127 | |
| 9893ed | 128 | func (n *NNG) makeNNG(typ td) (mangos.Socket, error) { |
| 36766a | 129 | |
| Z | 130 | var sock mangos.Socket |
| 131 | var err error | |
| 132 | ||
| d23f54 | 133 | switch n.mode { |
| Z | 134 | case Bus: |
| 135 | sock, err = n.busMakeNNG(typ) | |
| 3e6484 | 136 | case ReqRep, SurvResp: |
| d23f54 | 137 | sock, err = n.rrMakeNNG(typ) |
| 9e1301 | 138 | default: |
| d23f54 | 139 | sock, err = n.ppMakeNNG(typ) |
| 9d4b12 | 140 | } |
| Z | 141 | |
| d23f54 | 142 | return sock, err |
| 9d4b12 | 143 | } |
| Z | 144 | |
| 145 | func rmExistedIpcName(url string) { | |
| 146 | s := strings.Split(url, "://") | |
| 147 | ||
| 148 | if s[0] == "ipc" { | |
| 149 | if _, err := os.Stat(s[1]); err == nil { | |
| 150 | os.Remove(s[1]) | |
| 151 | } | |
| 152 | } | |
| 153 | } | |
| 154 | ||
| 155 | // newSocket allocates a new Socket. The Socket is the handle used to | |
| 156 | // access the underlying library. | |
| 157 | func newSocket(p protocol) (mangos.Socket, error) { | |
| 158 | ||
| d23f54 | 159 | if p == NONE { |
| Z | 160 | return nil, errors.New("new socket protocol none") |
| 161 | } | |
| 9d4b12 | 162 | var s mangos.Socket |
| Z | 163 | var err error |
| 164 | ||
| 165 | switch p { | |
| 166 | case PUB: | |
| 167 | s, err = pub.NewSocket() | |
| 168 | case SUB: | |
| 169 | s, err = sub.NewSocket() | |
| 170 | case PUSH: | |
| 171 | s, err = push.NewSocket() | |
| 172 | case PULL: | |
| 173 | s, err = pull.NewSocket() | |
| 174 | case REQ: | |
| 175 | s, err = req.NewSocket() | |
| 176 | case REP: | |
| 177 | s, err = rep.NewSocket() | |
| 178 | case SURVEYOR: | |
| 179 | s, err = surveyor.NewSocket() | |
| 180 | case RESPONDENT: | |
| 181 | s, err = respondent.NewSocket() | |
| 182 | case PAIR: | |
| 183 | s, err = pair.NewSocket() | |
| 184 | case BUS: | |
| 185 | s, err = bus.NewSocket() | |
| 186 | default: | |
| 187 | err = mangos.ErrBadProto | |
| 188 | } | |
| 189 | ||
| 190 | if err != nil { | |
| 191 | return nil, err | |
| 192 | } | |
| 193 | ||
| 194 | all.AddTransports(s) | |
| 195 | ||
| 196 | return s, nil | |
| 197 | } | |
| 198 | ||
| 199 | func die(format string, v ...interface{}) { | |
| 200 | fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...)) | |
| 201 | os.Exit(1) | |
| 202 | } | |
| 203 | ||
| 204 | // Protocol is the numeric abstraction to the various protocols or patterns | |
| 205 | // that Mangos supports. | |
| 206 | type protocol int | |
| 207 | ||
| 208 | // Constants for protocols. | |
| 209 | const ( | |
| d23f54 | 210 | NONE = -1 |
| 9d4b12 | 211 | PUSH = protocol(mangos.ProtoPush) |
| Z | 212 | PULL = protocol(mangos.ProtoPull) |
| 213 | PUB = protocol(mangos.ProtoPub) | |
| 214 | SUB = protocol(mangos.ProtoSub) | |
| 215 | REQ = protocol(mangos.ProtoReq) | |
| 216 | REP = protocol(mangos.ProtoRep) | |
| 217 | SURVEYOR = protocol(mangos.ProtoSurveyor) | |
| 218 | RESPONDENT = protocol(mangos.ProtoRespondent) | |
| 219 | BUS = protocol(mangos.ProtoBus) | |
| 220 | PAIR = protocol(mangos.ProtoPair) | |
| 221 | ) | |
| 222 | ||
| d23f54 | 223 | func protoAgent(m Mode) protocol { |
| 9d4b12 | 224 | switch m { |
| Z | 225 | case PushPull: |
| 226 | return PUSH | |
| 227 | case PubSub: | |
| 228 | return PUB | |
| 229 | case Pair: | |
| 230 | return PAIR | |
| d23f54 | 231 | case SurvResp: |
| Z | 232 | return SURVEYOR |
| 233 | case ReqRep: | |
| 234 | return REQ | |
| 235 | case Bus: | |
| 236 | return BUS | |
| 9d4b12 | 237 | } |
| d23f54 | 238 | return NONE |
| 9d4b12 | 239 | } |
| Z | 240 | |
| d23f54 | 241 | func protoCoactee(m Mode) protocol { |
| 9d4b12 | 242 | switch m { |
| Z | 243 | case PushPull: |
| 244 | return PULL | |
| 245 | case PubSub: | |
| 246 | return SUB | |
| 247 | case Pair: | |
| 248 | return PAIR | |
| d23f54 | 249 | case SurvResp: |
| Z | 250 | return RESPONDENT |
| 251 | case ReqRep: | |
| 252 | return REP | |
| 253 | case Bus: | |
| 254 | return BUS | |
| 9d4b12 | 255 | } |
| d23f54 | 256 | return NONE |
| 9d4b12 | 257 | } |