package deliver import ( "fmt" "os" "strings" "time" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/bus" "nanomsg.org/go-mangos/protocol/pair" "nanomsg.org/go-mangos/protocol/pub" "nanomsg.org/go-mangos/protocol/pull" "nanomsg.org/go-mangos/protocol/push" "nanomsg.org/go-mangos/protocol/rep" "nanomsg.org/go-mangos/protocol/req" "nanomsg.org/go-mangos/protocol/respondent" "nanomsg.org/go-mangos/protocol/sub" "nanomsg.org/go-mangos/protocol/surveyor" "nanomsg.org/go-mangos/transport/all" ) // NNG mangos wrap type NNG struct { sock mangos.Socket server bool mode Mode url string arguments []interface{} } // Send impl interface Diliver func (n *NNG) Send(data []byte) error { var err error if n.sock == nil { n.sock, err = n.makeNNG(true) if err != nil { fmt.Println("create nng producer error") return err } } if surveyorTime > 0 { time.Sleep(time.Duration(surveyorTime*2) * time.Second) } msg := mangos.NewMessage(len(data)) msg.Body = data return n.sock.SendMsg(msg) } // Recv impl interface Diliver func (n *NNG) Recv() ([]byte, error) { var err error if n.sock == nil { n.sock, err = n.makeNNG(false) if err != nil { fmt.Println("create nng consumer error") return nil, err } } var msg *mangos.Message if msg, err = n.sock.RecvMsg(); err != nil { return nil, err } return msg.Body, nil } // Close impl interface Deliver func (n *NNG) Close() { if n.sock != nil { n.sock.Close() n.sock = nil } } func nngServer(m Mode, url string, args ...interface{}) *NNG { rmExistedIpcName(url) return &NNG{ server: true, mode: m, url: url, arguments: args, } } func nngClient(m Mode, url string, args ...interface{}) *NNG { return &NNG{ server: false, mode: m, url: url, arguments: args, } } func proto(producer bool, m Mode) protocol { if producer { return protoProducer(m) } return protoConsumer(m) } func (n *NNG) makeNNG(producer bool) (mangos.Socket, error) { var sock mangos.Socket var err error if sock, err = newSocket(proto(producer, n.mode)); err != nil { return nil, err } if err = setSocketOptions(sock, n.arguments...); err != nil { sock.Close() sock = nil } if n.server { if err = sock.Listen(n.url); err != nil { sock.Close() sock = nil } } else { if err = sock.Dial(n.url); err != nil { sock.Close() sock = nil } } return sock, err } // maxRecvSize max recv size var ( maxRecvSize = 33 * 1024 * 1024 surveyorTime = -1 ) func defualtSocketOptions() map[string]interface{} { options := make(map[string]interface{}) options[mangos.OptionMaxRecvSize] = maxRecvSize options[mangos.OptionWriteQLen] = 0 options[mangos.OptionReadQLen] = 0 // options[mangos.OptionRecvDeadline] = time.Second // options[mangos.OptionSendDeadline] = time.Second options[mangos.OptionRaw] = true return options } func setSocketOptions(sock mangos.Socket, args ...interface{}) error { options := defualtSocketOptions() switch sock.GetProtocol().Number() { case mangos.ProtoSub: topic := "" for _, arg := range args { switch arg.(type) { case string: topic = arg.(string) default: } } options[mangos.OptionSubscribe] = []byte(topic) case mangos.ProtoSurveyor: for _, arg := range args { switch arg.(type) { case int: if arg.(int) < 2 { surveyorTime = 1 } else { surveyorTime = arg.(int) / 2 } default: } } options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second default: } for k, v := range options { if err := sock.SetOption(k, v); err != nil { return err } } return nil } func rmExistedIpcName(url string) { s := strings.Split(url, "://") if s[0] == "ipc" { if _, err := os.Stat(s[1]); err == nil { os.Remove(s[1]) } } } // newSocket allocates a new Socket. The Socket is the handle used to // access the underlying library. func newSocket(p protocol) (mangos.Socket, error) { var s mangos.Socket var err error switch p { case PUB: s, err = pub.NewSocket() case SUB: s, err = sub.NewSocket() case PUSH: s, err = push.NewSocket() case PULL: s, err = pull.NewSocket() case REQ: s, err = req.NewSocket() case REP: s, err = rep.NewSocket() case SURVEYOR: s, err = surveyor.NewSocket() case RESPONDENT: s, err = respondent.NewSocket() case PAIR: s, err = pair.NewSocket() case BUS: s, err = bus.NewSocket() default: err = mangos.ErrBadProto } if err != nil { return nil, err } all.AddTransports(s) return s, nil } func die(format string, v ...interface{}) { fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...)) os.Exit(1) } // Protocol is the numeric abstraction to the various protocols or patterns // that Mangos supports. type protocol int // Constants for protocols. const ( PUSH = protocol(mangos.ProtoPush) PULL = protocol(mangos.ProtoPull) PUB = protocol(mangos.ProtoPub) SUB = protocol(mangos.ProtoSub) REQ = protocol(mangos.ProtoReq) REP = protocol(mangos.ProtoRep) SURVEYOR = protocol(mangos.ProtoSurveyor) RESPONDENT = protocol(mangos.ProtoRespondent) BUS = protocol(mangos.ProtoBus) PAIR = protocol(mangos.ProtoPair) ) func protoProducer(m Mode) protocol { switch m { case PushPull: return PUSH case PubSub: return PUB case ReqRep: return REP case SurvResp: return SURVEYOR case Bus: return BUS case Pair: return PAIR } return PUSH } func protoConsumer(m Mode) protocol { switch m { case PushPull: return PULL case PubSub: return SUB case ReqRep: return REQ case SurvResp: return RESPONDENT case Bus: return BUS case Pair: return PAIR } return PULL }