| 提交 | 用户 | age | ||
| 9d4b12 | 1 | package deliver |
| Z | 2 | |
| 3 | import ( | |
| d23f54 | 4 | "errors" |
| 9d4b12 | 5 | "fmt" |
| Z | 6 | "os" |
| 7 | "strings" | |
| 9e1301 | 8 | "time" |
| 9d4b12 | 9 | |
| Z | 10 | "nanomsg.org/go-mangos" |
| 11 | "nanomsg.org/go-mangos/protocol/bus" | |
| 12 | "nanomsg.org/go-mangos/protocol/pair" | |
| 13 | "nanomsg.org/go-mangos/protocol/pub" | |
| 14 | "nanomsg.org/go-mangos/protocol/pull" | |
| 15 | "nanomsg.org/go-mangos/protocol/push" | |
| 16 | "nanomsg.org/go-mangos/protocol/rep" | |
| 17 | "nanomsg.org/go-mangos/protocol/req" | |
| 18 | "nanomsg.org/go-mangos/protocol/respondent" | |
| 19 | "nanomsg.org/go-mangos/protocol/sub" | |
| 20 | "nanomsg.org/go-mangos/protocol/surveyor" | |
| 21 | "nanomsg.org/go-mangos/transport/all" | |
| 22 | ) | |
| 23 | ||
| 24 | // NNG mangos wrap | |
| 25 | type NNG struct { | |
| 9a89af | 26 | sock mangos.Socket |
| Z | 27 | typ td |
| 28 | mode Mode | |
| 29 | url string | |
| 36766a | 30 | |
| Z | 31 | arguments []interface{} |
| 9d4b12 | 32 | } |
| Z | 33 | |
| 34 | // Send impl interface Diliver | |
| 35 | func (n *NNG) Send(data []byte) error { | |
| c2bbe3 | 36 | if n == nil { |
| Z | 37 | return errors.New("please init NNG first") |
| 38 | } | |
| 36766a | 39 | var err error |
| 9d4b12 | 40 | if n.sock == nil { |
| d23f54 | 41 | n.sock, err = n.makeNNG(agent) |
| 36766a | 42 | if err != nil { |
| d23f54 | 43 | fmt.Println("create nng sender error") |
| 36766a | 44 | return err |
| Z | 45 | } |
| 9d4b12 | 46 | } |
| Z | 47 | |
| c0dcd1 | 48 | if surveyorTime > 0 { |
| Z | 49 | time.Sleep(time.Duration(surveyorTime*2) * time.Second) |
| 50 | } | |
| 51 | ||
| 36766a | 52 | msg := mangos.NewMessage(len(data)) |
| Z | 53 | msg.Body = data |
| 54 | return n.sock.SendMsg(msg) | |
| 9e1301 | 55 | |
| 9d4b12 | 56 | } |
| Z | 57 | |
| 58 | // Recv impl interface Diliver | |
| 59 | func (n *NNG) Recv() ([]byte, error) { | |
| c2bbe3 | 60 | if n == nil { |
| Z | 61 | return nil, errors.New("please init NNG first") |
| 62 | } | |
| 63 | ||
| 36766a | 64 | var err error |
| Z | 65 | |
| 9d4b12 | 66 | if n.sock == nil { |
| d23f54 | 67 | n.sock, err = n.makeNNG(coactee) |
| 36766a | 68 | if err != nil { |
| d23f54 | 69 | fmt.Println("create nng reciever error") |
| 5ff1f3 | 70 | return nil, err |
| a6b23c | 71 | } |
| 9d4b12 | 72 | } |
| 36766a | 73 | |
| Z | 74 | var msg *mangos.Message |
| 75 | if msg, err = n.sock.RecvMsg(); err != nil { | |
| 76 | return nil, err | |
| 77 | } | |
| 78 | return msg.Body, nil | |
| 79 | ||
| 9d4b12 | 80 | } |
| Z | 81 | |
| aaae99 | 82 | // Close impl interface Deliver |
| Z | 83 | func (n *NNG) Close() { |
| c2bbe3 | 84 | if n != nil && n.sock != nil { |
| aaae99 | 85 | n.sock.Close() |
| 8e158e | 86 | n.sock = nil |
| aaae99 | 87 | } |
| Z | 88 | } |
| 89 | ||
| 6887a3 | 90 | func nngServer(m Mode, url string, args ...interface{}) *NNG { |
| 9d4b12 | 91 | |
| Z | 92 | rmExistedIpcName(url) |
| 93 | ||
| 36766a | 94 | return &NNG{ |
| 9a89af | 95 | typ: agent, |
| 36766a | 96 | mode: m, |
| Z | 97 | url: url, |
| 98 | arguments: args, | |
| 99 | } | |
| 9d4b12 | 100 | } |
| Z | 101 | |
| 6887a3 | 102 | func nngClient(m Mode, url string, args ...interface{}) *NNG { |
| 9d4b12 | 103 | |
| 36766a | 104 | return &NNG{ |
| 9a89af | 105 | typ: coactee, |
| 36766a | 106 | mode: m, |
| Z | 107 | url: url, |
| 108 | arguments: args, | |
| 9d4b12 | 109 | } |
| Z | 110 | |
| 36766a | 111 | } |
| Z | 112 | |
| 9893ed | 113 | func proto(typ td, m Mode) protocol { |
| d23f54 | 114 | if typ == agent { |
| Z | 115 | return protoAgent(m) |
| 116 | } else if typ == coactee { | |
| 117 | return protoCoactee(m) | |
| 36766a | 118 | } |
| d23f54 | 119 | return NONE |
| 36766a | 120 | } |
| Z | 121 | |
| 9893ed | 122 | func (n *NNG) makeNNG(typ td) (mangos.Socket, error) { |
| 36766a | 123 | |
| Z | 124 | var sock mangos.Socket |
| 125 | var err error | |
| 126 | ||
| d23f54 | 127 | switch n.mode { |
| Z | 128 | case Bus: |
| 129 | sock, err = n.busMakeNNG(typ) | |
| 3e6484 | 130 | case ReqRep, SurvResp: |
| d23f54 | 131 | sock, err = n.rrMakeNNG(typ) |
| 9e1301 | 132 | default: |
| d23f54 | 133 | sock, err = n.ppMakeNNG(typ) |
| 9d4b12 | 134 | } |
| Z | 135 | |
| d23f54 | 136 | return sock, err |
| 9d4b12 | 137 | } |
| Z | 138 | |
| 139 | func rmExistedIpcName(url string) { | |
| 140 | s := strings.Split(url, "://") | |
| 141 | ||
| 142 | if s[0] == "ipc" { | |
| 143 | if _, err := os.Stat(s[1]); err == nil { | |
| 144 | os.Remove(s[1]) | |
| 145 | } | |
| 146 | } | |
| 147 | } | |
| 148 | ||
| 149 | // newSocket allocates a new Socket. The Socket is the handle used to | |
| 150 | // access the underlying library. | |
| 151 | func newSocket(p protocol) (mangos.Socket, error) { | |
| 152 | ||
| d23f54 | 153 | if p == NONE { |
| Z | 154 | return nil, errors.New("new socket protocol none") |
| 155 | } | |
| 9d4b12 | 156 | var s mangos.Socket |
| Z | 157 | var err error |
| 158 | ||
| 159 | switch p { | |
| 160 | case PUB: | |
| 161 | s, err = pub.NewSocket() | |
| 162 | case SUB: | |
| 163 | s, err = sub.NewSocket() | |
| 164 | case PUSH: | |
| 165 | s, err = push.NewSocket() | |
| 166 | case PULL: | |
| 167 | s, err = pull.NewSocket() | |
| 168 | case REQ: | |
| 169 | s, err = req.NewSocket() | |
| 170 | case REP: | |
| 171 | s, err = rep.NewSocket() | |
| 172 | case SURVEYOR: | |
| 173 | s, err = surveyor.NewSocket() | |
| 174 | case RESPONDENT: | |
| 175 | s, err = respondent.NewSocket() | |
| 176 | case PAIR: | |
| 177 | s, err = pair.NewSocket() | |
| 178 | case BUS: | |
| 179 | s, err = bus.NewSocket() | |
| 180 | default: | |
| 181 | err = mangos.ErrBadProto | |
| 182 | } | |
| 183 | ||
| 184 | if err != nil { | |
| 185 | return nil, err | |
| 186 | } | |
| 187 | ||
| 188 | all.AddTransports(s) | |
| 189 | ||
| 190 | return s, nil | |
| 191 | } | |
| 192 | ||
| 193 | func die(format string, v ...interface{}) { | |
| 194 | fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...)) | |
| 195 | os.Exit(1) | |
| 196 | } | |
| 197 | ||
| 198 | // Protocol is the numeric abstraction to the various protocols or patterns | |
| 199 | // that Mangos supports. | |
| 200 | type protocol int | |
| 201 | ||
| 202 | // Constants for protocols. | |
| 203 | const ( | |
| d23f54 | 204 | NONE = -1 |
| 9d4b12 | 205 | PUSH = protocol(mangos.ProtoPush) |
| Z | 206 | PULL = protocol(mangos.ProtoPull) |
| 207 | PUB = protocol(mangos.ProtoPub) | |
| 208 | SUB = protocol(mangos.ProtoSub) | |
| 209 | REQ = protocol(mangos.ProtoReq) | |
| 210 | REP = protocol(mangos.ProtoRep) | |
| 211 | SURVEYOR = protocol(mangos.ProtoSurveyor) | |
| 212 | RESPONDENT = protocol(mangos.ProtoRespondent) | |
| 213 | BUS = protocol(mangos.ProtoBus) | |
| 214 | PAIR = protocol(mangos.ProtoPair) | |
| 215 | ) | |
| 216 | ||
| d23f54 | 217 | func protoAgent(m Mode) protocol { |
| 9d4b12 | 218 | switch m { |
| Z | 219 | case PushPull: |
| 220 | return PUSH | |
| 221 | case PubSub: | |
| 222 | return PUB | |
| 223 | case Pair: | |
| 224 | return PAIR | |
| d23f54 | 225 | case SurvResp: |
| Z | 226 | return SURVEYOR |
| 227 | case ReqRep: | |
| 228 | return REQ | |
| 229 | case Bus: | |
| 230 | return BUS | |
| 9d4b12 | 231 | } |
| d23f54 | 232 | return NONE |
| 9d4b12 | 233 | } |
| Z | 234 | |
| d23f54 | 235 | func protoCoactee(m Mode) protocol { |
| 9d4b12 | 236 | switch m { |
| Z | 237 | case PushPull: |
| 238 | return PULL | |
| 239 | case PubSub: | |
| 240 | return SUB | |
| 241 | case Pair: | |
| 242 | return PAIR | |
| d23f54 | 243 | case SurvResp: |
| Z | 244 | return RESPONDENT |
| 245 | case ReqRep: | |
| 246 | return REP | |
| 247 | case Bus: | |
| 248 | return BUS | |
| 9d4b12 | 249 | } |
| d23f54 | 250 | return NONE |
| 9d4b12 | 251 | } |