New file |
| | |
| | | package deliver |
| | | |
| | | import ( |
| | | "errors" |
| | | "fmt" |
| | | "os" |
| | | "strings" |
| | | |
| | | "nanomsg.org/go-mangos" |
| | | "nanomsg.org/go-mangos/protocol/bus" |
| | | "nanomsg.org/go-mangos/protocol/pair" |
| | | "nanomsg.org/go-mangos/protocol/pub" |
| | | "nanomsg.org/go-mangos/protocol/pull" |
| | | "nanomsg.org/go-mangos/protocol/push" |
| | | "nanomsg.org/go-mangos/protocol/rep" |
| | | "nanomsg.org/go-mangos/protocol/req" |
| | | "nanomsg.org/go-mangos/protocol/respondent" |
| | | "nanomsg.org/go-mangos/protocol/sub" |
| | | "nanomsg.org/go-mangos/protocol/surveyor" |
| | | "nanomsg.org/go-mangos/transport/all" |
| | | ) |
| | | |
| | | // NNG mangos wrap |
| | | type NNG struct { |
| | | sock mangos.Socket |
| | | } |
| | | |
| | | // Send impl interface Diliver |
| | | func (n *NNG) Send(data []byte) error { |
| | | if n.sock == nil { |
| | | return errors.New("please init NNG first") |
| | | } |
| | | |
| | | if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil { |
| | | msg := mangos.NewMessage(len(data)) |
| | | msg.Body = data |
| | | return n.sock.SendMsg(msg) |
| | | } |
| | | return n.sock.Send(data) |
| | | } |
| | | |
| | | // Recv impl interface Diliver |
| | | func (n *NNG) Recv() ([]byte, error) { |
| | | if n.sock == nil { |
| | | return nil, errors.New("please init NNG first") |
| | | } |
| | | if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil { |
| | | msg, err := n.sock.RecvMsg() |
| | | return msg.Body, err |
| | | } |
| | | return n.sock.Recv() |
| | | } |
| | | |
| | | // NewNNGProducer create from deliver Mode |
| | | func NewNNGProducer(m Mode, url string, args ...interface{}) *NNG { |
| | | |
| | | rmExistedIpcName(url) |
| | | if sock, err := newSocket(protoProducer(m)); err == nil { |
| | | if err = setSocketOptions(sock, args); err != nil { |
| | | return nil |
| | | } |
| | | if err = sock.Listen(url); err != nil { |
| | | sock.Close() |
| | | return nil |
| | | } |
| | | return &NNG{ |
| | | sock, |
| | | } |
| | | } |
| | | |
| | | return nil |
| | | } |
| | | |
| | | // NewNNGConsumer create from deliver Mode |
| | | func NewNNGConsumer(m Mode, url string, args ...interface{}) *NNG { |
| | | |
| | | if sock, err := newSocket(protoConsumer(m)); err == nil { |
| | | if err = setSocketOptions(sock, args); err != nil { |
| | | return nil |
| | | } |
| | | |
| | | if err = sock.Dial(url); err != nil { |
| | | sock.Close() |
| | | return nil |
| | | } |
| | | |
| | | return &NNG{ |
| | | sock, |
| | | } |
| | | } |
| | | |
| | | return nil |
| | | } |
| | | |
| | | // MaxRecvSize max recv size |
| | | var MaxRecvSize = 33 * 1024 * 1024 |
| | | |
| | | func defualtSocketOptions(sock mangos.Socket) error { |
| | | var err error |
| | | if err = sock.SetOption(mangos.OptionMaxRecvSize, MaxRecvSize); err != nil { |
| | | sock.Close() |
| | | return err |
| | | } |
| | | if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil { |
| | | sock.Close() |
| | | return err |
| | | } |
| | | if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil { |
| | | sock.Close() |
| | | return err |
| | | } |
| | | // if err = sock.SetOption(mangos.OptionNoDelay, true); err != nil { |
| | | // sock.Close() |
| | | // return err |
| | | // } |
| | | if err = sock.SetOption(mangos.OptionRaw, true); err != nil { |
| | | sock.Close() |
| | | return err |
| | | } |
| | | |
| | | return nil |
| | | } |
| | | |
| | | func setSocketOptions(sock mangos.Socket, args ...interface{}) error { |
| | | |
| | | err := defualtSocketOptions(sock) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | if sock.GetProtocol().Number() == mangos.ProtoSub { |
| | | for _, arg := range args { |
| | | switch arg.(type) { |
| | | case string: |
| | | err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string))) |
| | | default: |
| | | err = sock.SetOption(mangos.OptionSubscribe, []byte("")) |
| | | } |
| | | } |
| | | } |
| | | |
| | | return nil |
| | | } |
| | | |
| | | func rmExistedIpcName(url string) { |
| | | s := strings.Split(url, "://") |
| | | |
| | | if s[0] == "ipc" { |
| | | if _, err := os.Stat(s[1]); err == nil { |
| | | os.Remove(s[1]) |
| | | } |
| | | } |
| | | } |
| | | |
| | | // newSocket allocates a new Socket. The Socket is the handle used to |
| | | // access the underlying library. |
| | | func newSocket(p protocol) (mangos.Socket, error) { |
| | | |
| | | var s mangos.Socket |
| | | var err error |
| | | |
| | | switch p { |
| | | case PUB: |
| | | s, err = pub.NewSocket() |
| | | case SUB: |
| | | s, err = sub.NewSocket() |
| | | case PUSH: |
| | | s, err = push.NewSocket() |
| | | case PULL: |
| | | s, err = pull.NewSocket() |
| | | case REQ: |
| | | s, err = req.NewSocket() |
| | | case REP: |
| | | s, err = rep.NewSocket() |
| | | case SURVEYOR: |
| | | s, err = surveyor.NewSocket() |
| | | case RESPONDENT: |
| | | s, err = respondent.NewSocket() |
| | | case PAIR: |
| | | s, err = pair.NewSocket() |
| | | case BUS: |
| | | s, err = bus.NewSocket() |
| | | default: |
| | | err = mangos.ErrBadProto |
| | | } |
| | | |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | |
| | | all.AddTransports(s) |
| | | // s.AddTransport(ipc.NewTransport()) |
| | | // s.AddTransport(tcp.NewTransport()) |
| | | |
| | | return s, nil |
| | | } |
| | | |
| | | func die(format string, v ...interface{}) { |
| | | fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...)) |
| | | os.Exit(1) |
| | | } |
| | | |
| | | // Protocol is the numeric abstraction to the various protocols or patterns |
| | | // that Mangos supports. |
| | | type protocol int |
| | | |
| | | // Constants for protocols. |
| | | const ( |
| | | PUSH = protocol(mangos.ProtoPush) |
| | | PULL = protocol(mangos.ProtoPull) |
| | | PUB = protocol(mangos.ProtoPub) |
| | | SUB = protocol(mangos.ProtoSub) |
| | | REQ = protocol(mangos.ProtoReq) |
| | | REP = protocol(mangos.ProtoRep) |
| | | SURVEYOR = protocol(mangos.ProtoSurveyor) |
| | | RESPONDENT = protocol(mangos.ProtoRespondent) |
| | | BUS = protocol(mangos.ProtoBus) |
| | | PAIR = protocol(mangos.ProtoPair) |
| | | ) |
| | | |
| | | func protoProducer(m Mode) protocol { |
| | | switch m { |
| | | case PushPull: |
| | | return PUSH |
| | | case PubSub: |
| | | return PUB |
| | | case ReqRep: |
| | | return REP |
| | | case SurvResp: |
| | | return SURVEYOR |
| | | case Bus: |
| | | return BUS |
| | | case Pair: |
| | | return PAIR |
| | | } |
| | | return PUSH |
| | | } |
| | | |
| | | func protoConsumer(m Mode) protocol { |
| | | switch m { |
| | | case PushPull: |
| | | return PULL |
| | | case PubSub: |
| | | return SUB |
| | | case ReqRep: |
| | | return REQ |
| | | case SurvResp: |
| | | return RESPONDENT |
| | | case Bus: |
| | | return BUS |
| | | case Pair: |
| | | return PAIR |
| | | } |
| | | return PULL |
| | | } |