From 2d390df9ede39c9d7c34bd8190b9329cfc371325 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期五, 27 九月 2019 16:19:05 +0800 Subject: [PATCH] remove exist ipc --- nng.go | 224 ++++++++++++++++++++++++------------------------------- 1 files changed, 97 insertions(+), 127 deletions(-) diff --git a/nng.go b/nng.go index 77619e3..812cc37 100644 --- a/nng.go +++ b/nng.go @@ -24,156 +24,122 @@ // NNG mangos wrap type NNG struct { sock mangos.Socket + typ td + mode Mode + url string + + arguments []interface{} } // Send impl interface Diliver func (n *NNG) Send(data []byte) error { - if n.sock == nil { + if n == nil { return errors.New("please init NNG first") } - - switch n.sock.GetProtocol().Number() { - case mangos.ProtoSurveyor: - time.Sleep(surveyorTime * 2) - default: - } - if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil { - msg := mangos.NewMessage(len(data)) - msg.Body = data - return n.sock.SendMsg(msg) + var err error + if n.sock == nil { + n.sock, err = n.makeNNG(agent) + if err != nil { + fmt.Println("create nng sender error") + return err + } } - return n.sock.Send(data) + if surveyorTime > 0 { + time.Sleep(time.Duration(surveyorTime*2) * time.Second) + } + + msg := mangos.NewMessage(1) + msg.Body = data + return n.sock.SendMsg(msg) } // Recv impl interface Diliver func (n *NNG) Recv() ([]byte, error) { - if n.sock == nil { + if n == nil { return nil, errors.New("please init NNG first") } - if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil { - if msg, err := n.sock.RecvMsg(); err == nil { - return msg.Body, err + + var err error + + if n.sock == nil { + n.sock, err = n.makeNNG(coactee) + if err != nil { + fmt.Println("create nng reciever error") + return nil, err } + } + + var msg *mangos.Message + if msg, err = n.sock.RecvMsg(); err != nil { return nil, err } - return n.sock.Recv() + return msg.Body, nil + +} + +// Recv2 impl interface +func (n *NNG) Recv2(data []byte) (l int, err error) { + data, err = n.Recv() + l = len(data) + return l, err } // Close impl interface Deliver func (n *NNG) Close() { - if n.sock != nil { + if n != nil && n.sock != nil { n.sock.Close() + n.sock = nil } } -// nngProducer create from deliver Mode -func nngProducer(m Mode, url string, args ...interface{}) *NNG { +func nngServer(m Mode, url string, args ...interface{}) *NNG { rmExistedIpcName(url) - if sock, err := newSocket(protoProducer(m)); err == nil { - if err = setSocketOptions(sock, args); err != nil { - return nil - } - if err = sock.Listen(url); err != nil { - sock.Close() - return nil - } - return &NNG{ - sock, - } - } - return nil + return &NNG{ + typ: agent, + mode: m, + url: url, + arguments: args, + } } -// nngConsumer create from deliver Mode -func nngConsumer(m Mode, url string, args ...interface{}) *NNG { +func nngClient(m Mode, url string, args ...interface{}) *NNG { - if sock, err := newSocket(protoConsumer(m)); err == nil { - if err = setSocketOptions(sock, args); err != nil { - return nil - } - - if err = sock.Dial(url); err != nil { - sock.Close() - return nil - } - - return &NNG{ - sock, - } + return &NNG{ + typ: coactee, + mode: m, + url: url, + arguments: args, } - return nil } -// maxRecvSize max recv size -var ( - maxRecvSize = 33 * 1024 * 1024 - surveyorTime = time.Second / 2 -) +func proto(typ td, m Mode) protocol { + if typ == agent { + return protoAgent(m) + } else if typ == coactee { + return protoCoactee(m) + } + return NONE +} -func defualtSocketOptions(sock mangos.Socket) error { +func (n *NNG) makeNNG(typ td) (mangos.Socket, error) { + + var sock mangos.Socket var err error - if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil { - sock.Close() - return err - } - if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil { - sock.Close() - return err - } - if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil { - sock.Close() - return err - } - if err = sock.SetOption(mangos.OptionRecvDeadline, time.Second); err != nil { - sock.Close() - return err - } - if err = sock.SetOption(mangos.OptionSendDeadline, time.Second); err != nil { - sock.Close() - return err - } - if err = sock.SetOption(mangos.OptionRaw, true); err != nil { - sock.Close() - return err - } - return nil -} - -func setSocketOptions(sock mangos.Socket, args ...interface{}) error { - - err := defualtSocketOptions(sock) - if err != nil { - return err - } - switch sock.GetProtocol().Number() { - case mangos.ProtoSub: - for _, arg := range args { - switch arg.(type) { - case string: - err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string))) - default: - err = sock.SetOption(mangos.OptionSubscribe, []byte("")) - } - } - case mangos.ProtoSurveyor: - for _, arg := range args { - switch arg.(type) { - case int: - surveyorTime = time.Duration(arg.(int)/2) * time.Second - default: - } - err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime) - } + switch n.mode { + case Bus: + sock, err = n.busMakeNNG(typ) + case ReqRep, SurvResp: + sock, err = n.rrMakeNNG(typ) default: - fmt.Println("no additional args") + sock, err = n.ppMakeNNG(typ) } - return nil + return sock, err } func rmExistedIpcName(url string) { @@ -181,6 +147,8 @@ if s[0] == "ipc" { if _, err := os.Stat(s[1]); err == nil { + os.Remove(s[1]) + } else if !os.IsNotExist(err) { os.Remove(s[1]) } } @@ -190,6 +158,9 @@ // access the underlying library. func newSocket(p protocol) (mangos.Socket, error) { + if p == NONE { + return nil, errors.New("new socket protocol none") + } var s mangos.Socket var err error @@ -223,8 +194,6 @@ } all.AddTransports(s) - // s.AddTransport(ipc.NewTransport()) - // s.AddTransport(tcp.NewTransport()) return s, nil } @@ -240,6 +209,7 @@ // Constants for protocols. const ( + NONE = -1 PUSH = protocol(mangos.ProtoPush) PULL = protocol(mangos.ProtoPull) PUB = protocol(mangos.ProtoPub) @@ -252,38 +222,38 @@ PAIR = protocol(mangos.ProtoPair) ) -func protoProducer(m Mode) protocol { +func protoAgent(m Mode) protocol { switch m { case PushPull: return PUSH case PubSub: return PUB - case ReqRep: - return REP - case SurvResp: - return SURVEYOR - case Bus: - return BUS case Pair: return PAIR + case SurvResp: + return SURVEYOR + case ReqRep: + return REQ + case Bus: + return BUS } - return PUSH + return NONE } -func protoConsumer(m Mode) protocol { +func protoCoactee(m Mode) protocol { switch m { case PushPull: return PULL case PubSub: return SUB - case ReqRep: - return REQ - case SurvResp: - return RESPONDENT - case Bus: - return BUS case Pair: return PAIR + case SurvResp: + return RESPONDENT + case ReqRep: + return REP + case Bus: + return BUS } - return PULL + return NONE } -- Gitblit v1.8.0