| 提交 | 用户 | age | ||
| 9d4b12 | 1 | package deliver |
| Z | 2 | |
| 3 | import ( | |
| 4 | "errors" | |
| 5 | "fmt" | |
| 6 | "os" | |
| 7 | "strings" | |
| 9e1301 | 8 | "time" |
| 9d4b12 | 9 | |
| Z | 10 | "nanomsg.org/go-mangos" |
| 11 | "nanomsg.org/go-mangos/protocol/bus" | |
| 12 | "nanomsg.org/go-mangos/protocol/pair" | |
| 13 | "nanomsg.org/go-mangos/protocol/pub" | |
| 14 | "nanomsg.org/go-mangos/protocol/pull" | |
| 15 | "nanomsg.org/go-mangos/protocol/push" | |
| 16 | "nanomsg.org/go-mangos/protocol/rep" | |
| 17 | "nanomsg.org/go-mangos/protocol/req" | |
| 18 | "nanomsg.org/go-mangos/protocol/respondent" | |
| 19 | "nanomsg.org/go-mangos/protocol/sub" | |
| 20 | "nanomsg.org/go-mangos/protocol/surveyor" | |
| 21 | "nanomsg.org/go-mangos/transport/all" | |
| 22 | ) | |
| 23 | ||
| 24 | // NNG mangos wrap | |
| 25 | type NNG struct { | |
| 26 | sock mangos.Socket | |
| 27 | } | |
| 28 | ||
| 29 | // Send impl interface Diliver | |
| 30 | func (n *NNG) Send(data []byte) error { | |
| 31 | if n.sock == nil { | |
| 32 | return errors.New("please init NNG first") | |
| 33 | } | |
| 34 | ||
| 9e1301 | 35 | switch n.sock.GetProtocol().Number() { |
| Z | 36 | case mangos.ProtoSurveyor: |
| 37 | time.Sleep(surveyorTime * 2) | |
| 38 | default: | |
| 39 | } | |
| 9d4b12 | 40 | if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil { |
| Z | 41 | msg := mangos.NewMessage(len(data)) |
| 42 | msg.Body = data | |
| 43 | return n.sock.SendMsg(msg) | |
| 44 | } | |
| 9e1301 | 45 | |
| 9d4b12 | 46 | return n.sock.Send(data) |
| Z | 47 | } |
| 48 | ||
| 49 | // Recv impl interface Diliver | |
| 50 | func (n *NNG) Recv() ([]byte, error) { | |
| 51 | if n.sock == nil { | |
| 52 | return nil, errors.New("please init NNG first") | |
| 53 | } | |
| 54 | if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil { | |
| 55 | msg, err := n.sock.RecvMsg() | |
| 56 | return msg.Body, err | |
| 57 | } | |
| 58 | return n.sock.Recv() | |
| 59 | } | |
| 60 | ||
| f5368c | 61 | // nngProducer create from deliver Mode |
| Z | 62 | func nngProducer(m Mode, url string, args ...interface{}) *NNG { |
| 9d4b12 | 63 | |
| Z | 64 | rmExistedIpcName(url) |
| 65 | if sock, err := newSocket(protoProducer(m)); err == nil { | |
| 66 | if err = setSocketOptions(sock, args); err != nil { | |
| 67 | return nil | |
| 68 | } | |
| 69 | if err = sock.Listen(url); err != nil { | |
| 70 | sock.Close() | |
| 71 | return nil | |
| 72 | } | |
| 73 | return &NNG{ | |
| 74 | sock, | |
| 75 | } | |
| 76 | } | |
| 77 | ||
| 78 | return nil | |
| 79 | } | |
| 80 | ||
| f5368c | 81 | // nngConsumer create from deliver Mode |
| Z | 82 | func nngConsumer(m Mode, url string, args ...interface{}) *NNG { |
| 9d4b12 | 83 | |
| Z | 84 | if sock, err := newSocket(protoConsumer(m)); err == nil { |
| 85 | if err = setSocketOptions(sock, args); err != nil { | |
| 86 | return nil | |
| 87 | } | |
| 88 | ||
| 89 | if err = sock.Dial(url); err != nil { | |
| 90 | sock.Close() | |
| 91 | return nil | |
| 92 | } | |
| 93 | ||
| 94 | return &NNG{ | |
| 95 | sock, | |
| 96 | } | |
| 97 | } | |
| 98 | ||
| 99 | return nil | |
| 100 | } | |
| 101 | ||
| 9e1301 | 102 | // maxRecvSize max recv size |
| Z | 103 | var ( |
| 104 | maxRecvSize = 33 * 1024 * 1024 | |
| 105 | surveyorTime = time.Second / 2 | |
| 106 | ) | |
| 9d4b12 | 107 | |
| Z | 108 | func defualtSocketOptions(sock mangos.Socket) error { |
| 109 | var err error | |
| 9e1301 | 110 | if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil { |
| 9d4b12 | 111 | sock.Close() |
| Z | 112 | return err |
| 113 | } | |
| 114 | if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil { | |
| 115 | sock.Close() | |
| 116 | return err | |
| 117 | } | |
| 118 | if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil { | |
| 119 | sock.Close() | |
| 120 | return err | |
| 121 | } | |
| 4a091d | 122 | // if err = sock.SetOption(mangos.OptionRecvDeadline, time.Second); err != nil { |
| Z | 123 | // sock.Close() |
| 124 | // return err | |
| 125 | // } | |
| 126 | // if err = sock.SetOption(mangos.OptionSendDeadline, time.Second); err != nil { | |
| 9d4b12 | 127 | // sock.Close() |
| Z | 128 | // return err |
| 129 | // } | |
| 130 | if err = sock.SetOption(mangos.OptionRaw, true); err != nil { | |
| 131 | sock.Close() | |
| 132 | return err | |
| 133 | } | |
| 134 | ||
| 135 | return nil | |
| 136 | } | |
| 137 | ||
| 138 | func setSocketOptions(sock mangos.Socket, args ...interface{}) error { | |
| 139 | ||
| 140 | err := defualtSocketOptions(sock) | |
| 141 | if err != nil { | |
| 142 | return err | |
| 143 | } | |
| 9e1301 | 144 | switch sock.GetProtocol().Number() { |
| Z | 145 | case mangos.ProtoSub: |
| 9d4b12 | 146 | for _, arg := range args { |
| Z | 147 | switch arg.(type) { |
| 148 | case string: | |
| 149 | err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string))) | |
| 150 | default: | |
| 151 | err = sock.SetOption(mangos.OptionSubscribe, []byte("")) | |
| 152 | } | |
| 153 | } | |
| 9e1301 | 154 | case mangos.ProtoSurveyor: |
| Z | 155 | for _, arg := range args { |
| 156 | switch arg.(type) { | |
| 157 | case int: | |
| 158 | surveyorTime = time.Duration(arg.(int)/2) * time.Second | |
| 159 | default: | |
| 160 | } | |
| 161 | err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime) | |
| 162 | } | |
| 163 | default: | |
| 164 | fmt.Println("no additional args") | |
| 9d4b12 | 165 | } |
| Z | 166 | |
| 167 | return nil | |
| 168 | } | |
| 169 | ||
| 170 | func rmExistedIpcName(url string) { | |
| 171 | s := strings.Split(url, "://") | |
| 172 | ||
| 173 | if s[0] == "ipc" { | |
| 174 | if _, err := os.Stat(s[1]); err == nil { | |
| 175 | os.Remove(s[1]) | |
| 176 | } | |
| 177 | } | |
| 178 | } | |
| 179 | ||
| 180 | // newSocket allocates a new Socket. The Socket is the handle used to | |
| 181 | // access the underlying library. | |
| 182 | func newSocket(p protocol) (mangos.Socket, error) { | |
| 183 | ||
| 184 | var s mangos.Socket | |
| 185 | var err error | |
| 186 | ||
| 187 | switch p { | |
| 188 | case PUB: | |
| 189 | s, err = pub.NewSocket() | |
| 190 | case SUB: | |
| 191 | s, err = sub.NewSocket() | |
| 192 | case PUSH: | |
| 193 | s, err = push.NewSocket() | |
| 194 | case PULL: | |
| 195 | s, err = pull.NewSocket() | |
| 196 | case REQ: | |
| 197 | s, err = req.NewSocket() | |
| 198 | case REP: | |
| 199 | s, err = rep.NewSocket() | |
| 200 | case SURVEYOR: | |
| 201 | s, err = surveyor.NewSocket() | |
| 202 | case RESPONDENT: | |
| 203 | s, err = respondent.NewSocket() | |
| 204 | case PAIR: | |
| 205 | s, err = pair.NewSocket() | |
| 206 | case BUS: | |
| 207 | s, err = bus.NewSocket() | |
| 208 | default: | |
| 209 | err = mangos.ErrBadProto | |
| 210 | } | |
| 211 | ||
| 212 | if err != nil { | |
| 213 | return nil, err | |
| 214 | } | |
| 215 | ||
| 216 | all.AddTransports(s) | |
| 217 | // s.AddTransport(ipc.NewTransport()) | |
| 218 | // s.AddTransport(tcp.NewTransport()) | |
| 219 | ||
| 220 | return s, nil | |
| 221 | } | |
| 222 | ||
| 223 | func die(format string, v ...interface{}) { | |
| 224 | fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...)) | |
| 225 | os.Exit(1) | |
| 226 | } | |
| 227 | ||
| 228 | // Protocol is the numeric abstraction to the various protocols or patterns | |
| 229 | // that Mangos supports. | |
| 230 | type protocol int | |
| 231 | ||
| 232 | // Constants for protocols. | |
| 233 | const ( | |
| 234 | PUSH = protocol(mangos.ProtoPush) | |
| 235 | PULL = protocol(mangos.ProtoPull) | |
| 236 | PUB = protocol(mangos.ProtoPub) | |
| 237 | SUB = protocol(mangos.ProtoSub) | |
| 238 | REQ = protocol(mangos.ProtoReq) | |
| 239 | REP = protocol(mangos.ProtoRep) | |
| 240 | SURVEYOR = protocol(mangos.ProtoSurveyor) | |
| 241 | RESPONDENT = protocol(mangos.ProtoRespondent) | |
| 242 | BUS = protocol(mangos.ProtoBus) | |
| 243 | PAIR = protocol(mangos.ProtoPair) | |
| 244 | ) | |
| 245 | ||
| 246 | func protoProducer(m Mode) protocol { | |
| 247 | switch m { | |
| 248 | case PushPull: | |
| 249 | return PUSH | |
| 250 | case PubSub: | |
| 251 | return PUB | |
| 252 | case ReqRep: | |
| 253 | return REP | |
| 254 | case SurvResp: | |
| 255 | return SURVEYOR | |
| 256 | case Bus: | |
| 257 | return BUS | |
| 258 | case Pair: | |
| 259 | return PAIR | |
| 260 | } | |
| 261 | return PUSH | |
| 262 | } | |
| 263 | ||
| 264 | func protoConsumer(m Mode) protocol { | |
| 265 | switch m { | |
| 266 | case PushPull: | |
| 267 | return PULL | |
| 268 | case PubSub: | |
| 269 | return SUB | |
| 270 | case ReqRep: | |
| 271 | return REQ | |
| 272 | case SurvResp: | |
| 273 | return RESPONDENT | |
| 274 | case Bus: | |
| 275 | return BUS | |
| 276 | case Pair: | |
| 277 | return PAIR | |
| 278 | } | |
| 279 | return PULL | |
| 280 | } | |