From 162fe98d7728445b72528283e1bfdddc432d2676 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期四, 16 五月 2019 16:03:54 +0800 Subject: [PATCH] if create socket failed sock=nil --- nng.go | 95 +++++++++++++++++++++++------------------------ 1 files changed, 46 insertions(+), 49 deletions(-) diff --git a/nng.go b/nng.go index 61b5e76..e8f5baf 100644 --- a/nng.go +++ b/nng.go @@ -24,6 +24,7 @@ // NNG mangos wrap type NNG struct { sock mangos.Socket + raw bool } // Send impl interface Diliver @@ -32,12 +33,12 @@ return errors.New("please init NNG first") } - switch n.sock.GetProtocol().Number() { - case mangos.ProtoSurveyor: - time.Sleep(surveyorTime * 2) - default: - } - if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil { + // switch n.sock.GetProtocol().Number() { + // case mangos.ProtoSurveyor: + // time.Sleep(surveyorTime * 2) + // default: + // } + if n.raw { msg := mangos.NewMessage(len(data)) msg.Body = data return n.sock.SendMsg(msg) @@ -51,9 +52,13 @@ if n.sock == nil { return nil, errors.New("please init NNG first") } - if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil { - msg, err := n.sock.RecvMsg() - return msg.Body, err + if n.raw { + var msg *mangos.Message + var err error + if msg, err = n.sock.RecvMsg(); err != nil { + return nil, err + } + return msg.Body, nil } return n.sock.Recv() } @@ -62,44 +67,51 @@ func (n *NNG) Close() { if n.sock != nil { n.sock.Close() + n.sock = nil } } -// nngProducer create from deliver Mode -func nngProducer(m Mode, url string, args ...interface{}) *NNG { +func nngListener(m Mode, url string, args ...interface{}) *NNG { rmExistedIpcName(url) if sock, err := newSocket(protoProducer(m)); err == nil { if err = setSocketOptions(sock, args); err != nil { + sock.Close() + sock = nil return nil } if err = sock.Listen(url); err != nil { sock.Close() + sock = nil return nil } return &NNG{ sock, + true, } } return nil } -// nngConsumer create from deliver Mode -func nngConsumer(m Mode, url string, args ...interface{}) *NNG { +func nngDialer(m Mode, url string, args ...interface{}) *NNG { if sock, err := newSocket(protoConsumer(m)); err == nil { if err = setSocketOptions(sock, args); err != nil { + sock.Close() + sock = nil return nil } if err = sock.Dial(url); err != nil { sock.Close() + sock = nil return nil } return &NNG{ sock, + true, } } @@ -112,50 +124,32 @@ surveyorTime = time.Second / 2 ) -func defualtSocketOptions(sock mangos.Socket) error { - var err error - if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil { - sock.Close() - return err - } - if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil { - sock.Close() - return err - } - if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil { - sock.Close() - return err - } - // if err = sock.SetOption(mangos.OptionRecvDeadline, time.Second); err != nil { - // sock.Close() - // return err - // } - // if err = sock.SetOption(mangos.OptionSendDeadline, time.Second); err != nil { - // sock.Close() - // return err - // } - if err = sock.SetOption(mangos.OptionRaw, true); err != nil { - sock.Close() - return err - } +func defualtSocketOptions() map[string]interface{} { - return nil + options := make(map[string]interface{}) + + options[mangos.OptionMaxRecvSize] = maxRecvSize + options[mangos.OptionWriteQLen] = 0 + options[mangos.OptionReadQLen] = 0 + options[mangos.OptionRecvDeadline] = time.Second + options[mangos.OptionSendDeadline] = time.Second + options[mangos.OptionRaw] = true + + return options } func setSocketOptions(sock mangos.Socket, args ...interface{}) error { - err := defualtSocketOptions(sock) - if err != nil { - return err - } + options := defualtSocketOptions() + switch sock.GetProtocol().Number() { case mangos.ProtoSub: for _, arg := range args { switch arg.(type) { case string: - err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string))) + options[mangos.OptionSubscribe] = []byte(arg.(string)) default: - err = sock.SetOption(mangos.OptionSubscribe, []byte("")) + options[mangos.OptionSubscribe] = []byte("") } } case mangos.ProtoSurveyor: @@ -165,12 +159,17 @@ surveyorTime = time.Duration(arg.(int)/2) * time.Second default: } - err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime) + options[mangos.OptionSurveyTime] = surveyorTime } default: fmt.Println("no additional args") } + for k, v := range options { + if err := sock.SetOption(k, v); err != nil { + return err + } + } return nil } @@ -221,8 +220,6 @@ } all.AddTransports(s) - // s.AddTransport(ipc.NewTransport()) - // s.AddTransport(tcp.NewTransport()) return s, nil } -- Gitblit v1.8.0