| 提交 | 用户 | age | ||
| 9d4b12 | 1 | package deliver |
| Z | 2 | |
| 3 | import ( | |
| d23f54 | 4 | "errors" |
| 9d4b12 | 5 | "fmt" |
| Z | 6 | "os" |
| 7 | "strings" | |
| 9e1301 | 8 | "time" |
| 9d4b12 | 9 | |
| Z | 10 | "nanomsg.org/go-mangos" |
| 11 | "nanomsg.org/go-mangos/protocol/bus" | |
| 12 | "nanomsg.org/go-mangos/protocol/pair" | |
| 13 | "nanomsg.org/go-mangos/protocol/pub" | |
| 14 | "nanomsg.org/go-mangos/protocol/pull" | |
| 15 | "nanomsg.org/go-mangos/protocol/push" | |
| 16 | "nanomsg.org/go-mangos/protocol/rep" | |
| 17 | "nanomsg.org/go-mangos/protocol/req" | |
| 18 | "nanomsg.org/go-mangos/protocol/respondent" | |
| 19 | "nanomsg.org/go-mangos/protocol/sub" | |
| 20 | "nanomsg.org/go-mangos/protocol/surveyor" | |
| 21 | "nanomsg.org/go-mangos/transport/all" | |
| 22 | ) | |
| 23 | ||
| 24 | // NNG mangos wrap | |
| 25 | type NNG struct { | |
| 9a89af | 26 | sock mangos.Socket |
| Z | 27 | typ td |
| 28 | mode Mode | |
| 29 | url string | |
| 36766a | 30 | |
| Z | 31 | arguments []interface{} |
| 9d4b12 | 32 | } |
| Z | 33 | |
| 34 | // Send impl interface Diliver | |
| 35 | func (n *NNG) Send(data []byte) error { | |
| 36766a | 36 | var err error |
| 9d4b12 | 37 | if n.sock == nil { |
| d23f54 | 38 | n.sock, err = n.makeNNG(agent) |
| 36766a | 39 | if err != nil { |
| d23f54 | 40 | fmt.Println("create nng sender error") |
| 36766a | 41 | return err |
| Z | 42 | } |
| 9d4b12 | 43 | } |
| Z | 44 | |
| c0dcd1 | 45 | if surveyorTime > 0 { |
| Z | 46 | time.Sleep(time.Duration(surveyorTime*2) * time.Second) |
| 47 | } | |
| 48 | ||
| 36766a | 49 | msg := mangos.NewMessage(len(data)) |
| Z | 50 | msg.Body = data |
| 51 | return n.sock.SendMsg(msg) | |
| 9e1301 | 52 | |
| 9d4b12 | 53 | } |
| Z | 54 | |
| 55 | // Recv impl interface Diliver | |
| 56 | func (n *NNG) Recv() ([]byte, error) { | |
| 36766a | 57 | var err error |
| Z | 58 | |
| 9d4b12 | 59 | if n.sock == nil { |
| d23f54 | 60 | n.sock, err = n.makeNNG(coactee) |
| 36766a | 61 | if err != nil { |
| d23f54 | 62 | fmt.Println("create nng reciever error") |
| 5ff1f3 | 63 | return nil, err |
| a6b23c | 64 | } |
| 9d4b12 | 65 | } |
| 36766a | 66 | |
| Z | 67 | var msg *mangos.Message |
| 68 | if msg, err = n.sock.RecvMsg(); err != nil { | |
| 69 | return nil, err | |
| 70 | } | |
| 71 | return msg.Body, nil | |
| 72 | ||
| 9d4b12 | 73 | } |
| Z | 74 | |
| aaae99 | 75 | // Close impl interface Deliver |
| Z | 76 | func (n *NNG) Close() { |
| 77 | if n.sock != nil { | |
| 78 | n.sock.Close() | |
| 8e158e | 79 | n.sock = nil |
| aaae99 | 80 | } |
| Z | 81 | } |
| 82 | ||
| 6887a3 | 83 | func nngServer(m Mode, url string, args ...interface{}) *NNG { |
| 9d4b12 | 84 | |
| Z | 85 | rmExistedIpcName(url) |
| 86 | ||
| 36766a | 87 | return &NNG{ |
| 9a89af | 88 | typ: agent, |
| 36766a | 89 | mode: m, |
| Z | 90 | url: url, |
| 91 | arguments: args, | |
| 92 | } | |
| 9d4b12 | 93 | } |
| Z | 94 | |
| 6887a3 | 95 | func nngClient(m Mode, url string, args ...interface{}) *NNG { |
| 9d4b12 | 96 | |
| 36766a | 97 | return &NNG{ |
| 9a89af | 98 | typ: coactee, |
| 36766a | 99 | mode: m, |
| Z | 100 | url: url, |
| 101 | arguments: args, | |
| 9d4b12 | 102 | } |
| Z | 103 | |
| 36766a | 104 | } |
| Z | 105 | |
| 9893ed | 106 | func proto(typ td, m Mode) protocol { |
| d23f54 | 107 | if typ == agent { |
| Z | 108 | return protoAgent(m) |
| 109 | } else if typ == coactee { | |
| 110 | return protoCoactee(m) | |
| 36766a | 111 | } |
| d23f54 | 112 | return NONE |
| 36766a | 113 | } |
| Z | 114 | |
| 9893ed | 115 | func (n *NNG) makeNNG(typ td) (mangos.Socket, error) { |
| 36766a | 116 | |
| Z | 117 | var sock mangos.Socket |
| 118 | var err error | |
| 119 | ||
| d23f54 | 120 | switch n.mode { |
| Z | 121 | case Bus: |
| 122 | sock, err = n.busMakeNNG(typ) | |
| 123 | case ReqRep: | |
| 124 | sock, err = n.rrMakeNNG(typ) | |
| 9e1301 | 125 | default: |
| d23f54 | 126 | sock, err = n.ppMakeNNG(typ) |
| 9d4b12 | 127 | } |
| Z | 128 | |
| d23f54 | 129 | return sock, err |
| 9d4b12 | 130 | } |
| Z | 131 | |
| 132 | func rmExistedIpcName(url string) { | |
| 133 | s := strings.Split(url, "://") | |
| 134 | ||
| 135 | if s[0] == "ipc" { | |
| 136 | if _, err := os.Stat(s[1]); err == nil { | |
| 137 | os.Remove(s[1]) | |
| 138 | } | |
| 139 | } | |
| 140 | } | |
| 141 | ||
| 142 | // newSocket allocates a new Socket. The Socket is the handle used to | |
| 143 | // access the underlying library. | |
| 144 | func newSocket(p protocol) (mangos.Socket, error) { | |
| 145 | ||
| d23f54 | 146 | if p == NONE { |
| Z | 147 | return nil, errors.New("new socket protocol none") |
| 148 | } | |
| 9d4b12 | 149 | var s mangos.Socket |
| Z | 150 | var err error |
| 151 | ||
| 152 | switch p { | |
| 153 | case PUB: | |
| 154 | s, err = pub.NewSocket() | |
| 155 | case SUB: | |
| 156 | s, err = sub.NewSocket() | |
| 157 | case PUSH: | |
| 158 | s, err = push.NewSocket() | |
| 159 | case PULL: | |
| 160 | s, err = pull.NewSocket() | |
| 161 | case REQ: | |
| 162 | s, err = req.NewSocket() | |
| 163 | case REP: | |
| 164 | s, err = rep.NewSocket() | |
| 165 | case SURVEYOR: | |
| 166 | s, err = surveyor.NewSocket() | |
| 167 | case RESPONDENT: | |
| 168 | s, err = respondent.NewSocket() | |
| 169 | case PAIR: | |
| 170 | s, err = pair.NewSocket() | |
| 171 | case BUS: | |
| 172 | s, err = bus.NewSocket() | |
| 173 | default: | |
| 174 | err = mangos.ErrBadProto | |
| 175 | } | |
| 176 | ||
| 177 | if err != nil { | |
| 178 | return nil, err | |
| 179 | } | |
| 180 | ||
| 181 | all.AddTransports(s) | |
| 182 | ||
| 183 | return s, nil | |
| 184 | } | |
| 185 | ||
| 186 | func die(format string, v ...interface{}) { | |
| 187 | fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...)) | |
| 188 | os.Exit(1) | |
| 189 | } | |
| 190 | ||
| 191 | // Protocol is the numeric abstraction to the various protocols or patterns | |
| 192 | // that Mangos supports. | |
| 193 | type protocol int | |
| 194 | ||
| 195 | // Constants for protocols. | |
| 196 | const ( | |
| d23f54 | 197 | NONE = -1 |
| 9d4b12 | 198 | PUSH = protocol(mangos.ProtoPush) |
| Z | 199 | PULL = protocol(mangos.ProtoPull) |
| 200 | PUB = protocol(mangos.ProtoPub) | |
| 201 | SUB = protocol(mangos.ProtoSub) | |
| 202 | REQ = protocol(mangos.ProtoReq) | |
| 203 | REP = protocol(mangos.ProtoRep) | |
| 204 | SURVEYOR = protocol(mangos.ProtoSurveyor) | |
| 205 | RESPONDENT = protocol(mangos.ProtoRespondent) | |
| 206 | BUS = protocol(mangos.ProtoBus) | |
| 207 | PAIR = protocol(mangos.ProtoPair) | |
| 208 | ) | |
| 209 | ||
| d23f54 | 210 | func protoAgent(m Mode) protocol { |
| 9d4b12 | 211 | switch m { |
| Z | 212 | case PushPull: |
| 213 | return PUSH | |
| 214 | case PubSub: | |
| 215 | return PUB | |
| 216 | case Pair: | |
| 217 | return PAIR | |
| d23f54 | 218 | case SurvResp: |
| Z | 219 | return SURVEYOR |
| 220 | case ReqRep: | |
| 221 | return REQ | |
| 222 | case Bus: | |
| 223 | return BUS | |
| 9d4b12 | 224 | } |
| d23f54 | 225 | return NONE |
| 9d4b12 | 226 | } |
| Z | 227 | |
| d23f54 | 228 | func protoCoactee(m Mode) protocol { |
| 9d4b12 | 229 | switch m { |
| Z | 230 | case PushPull: |
| 231 | return PULL | |
| 232 | case PubSub: | |
| 233 | return SUB | |
| 234 | case Pair: | |
| 235 | return PAIR | |
| d23f54 | 236 | case SurvResp: |
| Z | 237 | return RESPONDENT |
| 238 | case ReqRep: | |
| 239 | return REP | |
| 240 | case Bus: | |
| 241 | return BUS | |
| 9d4b12 | 242 | } |
| d23f54 | 243 | return NONE |
| 9d4b12 | 244 | } |