| | |
| | | // NNG mangos wrap |
| | | type NNG struct { |
| | | sock mangos.Socket |
| | | raw bool |
| | | typ td |
| | | mode Mode |
| | | url string |
| | | |
| | | arguments []interface{} |
| | | } |
| | | |
| | | // Send impl interface Diliver |
| | | func (n *NNG) Send(data []byte) error { |
| | | if n.sock == nil { |
| | | if n == nil { |
| | | return errors.New("please init NNG first") |
| | | } |
| | | |
| | | // switch n.sock.GetProtocol().Number() { |
| | | // case mangos.ProtoSurveyor: |
| | | // time.Sleep(surveyorTime * 2) |
| | | // default: |
| | | // } |
| | | if n.raw { |
| | | msg := mangos.NewMessage(len(data)) |
| | | msg.Body = data |
| | | return n.sock.SendMsg(msg) |
| | | var err error |
| | | if n.sock == nil { |
| | | n.sock, err = n.makeNNG(agent) |
| | | if err != nil { |
| | | fmt.Println("create nng sender error") |
| | | return err |
| | | } |
| | | } |
| | | |
| | | return n.sock.Send(data) |
| | | if surveyorTime > 0 { |
| | | time.Sleep(time.Duration(surveyorTime*2) * time.Second) |
| | | } |
| | | |
| | | msg := mangos.NewMessage(1) |
| | | msg.Body = data |
| | | return n.sock.SendMsg(msg) |
| | | } |
| | | |
| | | // Recv impl interface Diliver |
| | | func (n *NNG) Recv() ([]byte, error) { |
| | | if n.sock == nil { |
| | | if n == nil { |
| | | return nil, errors.New("please init NNG first") |
| | | } |
| | | if n.raw { |
| | | var msg *mangos.Message |
| | | var err error |
| | | if msg, err = n.sock.RecvMsg(); err != nil { |
| | | |
| | | var err error |
| | | |
| | | if n.sock == nil { |
| | | n.sock, err = n.makeNNG(coactee) |
| | | if err != nil { |
| | | fmt.Println("create nng reciever error") |
| | | return nil, err |
| | | } |
| | | return msg.Body, nil |
| | | } |
| | | return n.sock.Recv() |
| | | |
| | | var msg *mangos.Message |
| | | if msg, err = n.sock.RecvMsg(); err != nil { |
| | | return nil, err |
| | | } |
| | | return msg.Body, nil |
| | | |
| | | } |
| | | |
| | | // Recv2 impl interface |
| | | func (n *NNG) Recv2(data []byte) (l int, err error) { |
| | | data, err = n.Recv() |
| | | l = len(data) |
| | | return l, err |
| | | } |
| | | |
| | | // Close impl interface Deliver |
| | | func (n *NNG) Close() { |
| | | if n.sock != nil { |
| | | if n != nil && n.sock != nil { |
| | | n.sock.Close() |
| | | n.sock = nil |
| | | } |
| | | } |
| | | |
| | | func nngListener(m Mode, url string, args ...interface{}) *NNG { |
| | | func nngServer(m Mode, url string, args ...interface{}) *NNG { |
| | | |
| | | rmExistedIpcName(url) |
| | | if sock, err := newSocket(protoProducer(m)); err == nil { |
| | | if err = setSocketOptions(sock, args); err != nil { |
| | | sock.Close() |
| | | sock = nil |
| | | return nil |
| | | } |
| | | if err = sock.Listen(url); err != nil { |
| | | sock.Close() |
| | | sock = nil |
| | | return nil |
| | | } |
| | | return &NNG{ |
| | | sock, |
| | | true, |
| | | } |
| | | |
| | | return &NNG{ |
| | | typ: agent, |
| | | mode: m, |
| | | url: url, |
| | | arguments: args, |
| | | } |
| | | } |
| | | |
| | | func nngClient(m Mode, url string, args ...interface{}) *NNG { |
| | | |
| | | return &NNG{ |
| | | typ: coactee, |
| | | mode: m, |
| | | url: url, |
| | | arguments: args, |
| | | } |
| | | |
| | | return nil |
| | | } |
| | | |
| | | func nngDialer(m Mode, url string, args ...interface{}) *NNG { |
| | | |
| | | if sock, err := newSocket(protoConsumer(m)); err == nil { |
| | | if err = setSocketOptions(sock, args); err != nil { |
| | | sock.Close() |
| | | sock = nil |
| | | return nil |
| | | } |
| | | |
| | | if err = sock.Dial(url); err != nil { |
| | | sock.Close() |
| | | sock = nil |
| | | return nil |
| | | } |
| | | |
| | | return &NNG{ |
| | | sock, |
| | | true, |
| | | } |
| | | func proto(typ td, m Mode) protocol { |
| | | if typ == agent { |
| | | return protoAgent(m) |
| | | } else if typ == coactee { |
| | | return protoCoactee(m) |
| | | } |
| | | |
| | | return nil |
| | | return NONE |
| | | } |
| | | |
| | | // maxRecvSize max recv size |
| | | var ( |
| | | maxRecvSize = 33 * 1024 * 1024 |
| | | surveyorTime = time.Second / 2 |
| | | ) |
| | | func (n *NNG) makeNNG(typ td) (mangos.Socket, error) { |
| | | |
| | | func defualtSocketOptions() map[string]interface{} { |
| | | var sock mangos.Socket |
| | | var err error |
| | | |
| | | options := make(map[string]interface{}) |
| | | |
| | | options[mangos.OptionMaxRecvSize] = maxRecvSize |
| | | options[mangos.OptionWriteQLen] = 0 |
| | | options[mangos.OptionReadQLen] = 0 |
| | | options[mangos.OptionRecvDeadline] = time.Second |
| | | options[mangos.OptionSendDeadline] = time.Second |
| | | options[mangos.OptionRaw] = true |
| | | |
| | | return options |
| | | } |
| | | |
| | | func setSocketOptions(sock mangos.Socket, args ...interface{}) error { |
| | | |
| | | options := defualtSocketOptions() |
| | | |
| | | switch sock.GetProtocol().Number() { |
| | | case mangos.ProtoSub: |
| | | for _, arg := range args { |
| | | switch arg.(type) { |
| | | case string: |
| | | options[mangos.OptionSubscribe] = []byte(arg.(string)) |
| | | default: |
| | | options[mangos.OptionSubscribe] = []byte("") |
| | | } |
| | | } |
| | | case mangos.ProtoSurveyor: |
| | | for _, arg := range args { |
| | | switch arg.(type) { |
| | | case int: |
| | | surveyorTime = time.Duration(arg.(int)/2) * time.Second |
| | | default: |
| | | } |
| | | options[mangos.OptionSurveyTime] = surveyorTime |
| | | } |
| | | switch n.mode { |
| | | case Bus: |
| | | sock, err = n.busMakeNNG(typ) |
| | | case ReqRep, SurvResp: |
| | | sock, err = n.rrMakeNNG(typ) |
| | | default: |
| | | fmt.Println("no additional args") |
| | | sock, err = n.ppMakeNNG(typ) |
| | | } |
| | | |
| | | for k, v := range options { |
| | | if err := sock.SetOption(k, v); err != nil { |
| | | return err |
| | | } |
| | | } |
| | | return nil |
| | | return sock, err |
| | | } |
| | | |
| | | func rmExistedIpcName(url string) { |
| | |
| | | |
| | | if s[0] == "ipc" { |
| | | if _, err := os.Stat(s[1]); err == nil { |
| | | os.Remove(s[1]) |
| | | } else if !os.IsNotExist(err) { |
| | | os.Remove(s[1]) |
| | | } |
| | | } |
| | |
| | | // access the underlying library. |
| | | func newSocket(p protocol) (mangos.Socket, error) { |
| | | |
| | | if p == NONE { |
| | | return nil, errors.New("new socket protocol none") |
| | | } |
| | | var s mangos.Socket |
| | | var err error |
| | | |
| | |
| | | |
| | | // Constants for protocols. |
| | | const ( |
| | | NONE = -1 |
| | | PUSH = protocol(mangos.ProtoPush) |
| | | PULL = protocol(mangos.ProtoPull) |
| | | PUB = protocol(mangos.ProtoPub) |
| | |
| | | PAIR = protocol(mangos.ProtoPair) |
| | | ) |
| | | |
| | | func protoProducer(m Mode) protocol { |
| | | func protoAgent(m Mode) protocol { |
| | | switch m { |
| | | case PushPull: |
| | | return PUSH |
| | | case PubSub: |
| | | return PUB |
| | | case ReqRep: |
| | | return REP |
| | | case SurvResp: |
| | | return SURVEYOR |
| | | case Bus: |
| | | return BUS |
| | | case Pair: |
| | | return PAIR |
| | | case SurvResp: |
| | | return SURVEYOR |
| | | case ReqRep: |
| | | return REQ |
| | | case Bus: |
| | | return BUS |
| | | } |
| | | return PUSH |
| | | return NONE |
| | | } |
| | | |
| | | func protoConsumer(m Mode) protocol { |
| | | func protoCoactee(m Mode) protocol { |
| | | switch m { |
| | | case PushPull: |
| | | return PULL |
| | | case PubSub: |
| | | return SUB |
| | | case ReqRep: |
| | | return REQ |
| | | case SurvResp: |
| | | return RESPONDENT |
| | | case Bus: |
| | | return BUS |
| | | case Pair: |
| | | return PAIR |
| | | case SurvResp: |
| | | return RESPONDENT |
| | | case ReqRep: |
| | | return REP |
| | | case Bus: |
| | | return BUS |
| | | } |
| | | return PULL |
| | | return NONE |
| | | } |