From 162fe98d7728445b72528283e1bfdddc432d2676 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期四, 16 五月 2019 16:03:54 +0800 Subject: [PATCH] if create socket failed sock=nil --- nng.go | 114 ++++++++++++++++++++++++++++++++++++--------------------- 1 files changed, 72 insertions(+), 42 deletions(-) diff --git a/nng.go b/nng.go index 84d3699..e8f5baf 100644 --- a/nng.go +++ b/nng.go @@ -5,6 +5,7 @@ "fmt" "os" "strings" + "time" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/bus" @@ -23,6 +24,7 @@ // NNG mangos wrap type NNG struct { sock mangos.Socket + raw bool } // Send impl interface Diliver @@ -31,11 +33,17 @@ return errors.New("please init NNG first") } - if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil { + // switch n.sock.GetProtocol().Number() { + // case mangos.ProtoSurveyor: + // time.Sleep(surveyorTime * 2) + // default: + // } + if n.raw { msg := mangos.NewMessage(len(data)) msg.Body = data return n.sock.SendMsg(msg) } + return n.sock.Send(data) } @@ -44,100 +52,124 @@ if n.sock == nil { return nil, errors.New("please init NNG first") } - if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil { - msg, err := n.sock.RecvMsg() - return msg.Body, err + if n.raw { + var msg *mangos.Message + var err error + if msg, err = n.sock.RecvMsg(); err != nil { + return nil, err + } + return msg.Body, nil } return n.sock.Recv() } -// nngProducer create from deliver Mode -func nngProducer(m Mode, url string, args ...interface{}) *NNG { +// Close impl interface Deliver +func (n *NNG) Close() { + if n.sock != nil { + n.sock.Close() + n.sock = nil + } +} + +func nngListener(m Mode, url string, args ...interface{}) *NNG { rmExistedIpcName(url) if sock, err := newSocket(protoProducer(m)); err == nil { if err = setSocketOptions(sock, args); err != nil { + sock.Close() + sock = nil return nil } if err = sock.Listen(url); err != nil { sock.Close() + sock = nil return nil } return &NNG{ sock, + true, } } return nil } -// nngConsumer create from deliver Mode -func nngConsumer(m Mode, url string, args ...interface{}) *NNG { +func nngDialer(m Mode, url string, args ...interface{}) *NNG { if sock, err := newSocket(protoConsumer(m)); err == nil { if err = setSocketOptions(sock, args); err != nil { + sock.Close() + sock = nil return nil } if err = sock.Dial(url); err != nil { sock.Close() + sock = nil return nil } return &NNG{ sock, + true, } } return nil } -// MaxRecvSize max recv size -var MaxRecvSize = 33 * 1024 * 1024 +// maxRecvSize max recv size +var ( + maxRecvSize = 33 * 1024 * 1024 + surveyorTime = time.Second / 2 +) -func defualtSocketOptions(sock mangos.Socket) error { - var err error - if err = sock.SetOption(mangos.OptionMaxRecvSize, MaxRecvSize); err != nil { - sock.Close() - return err - } - if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil { - sock.Close() - return err - } - if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil { - sock.Close() - return err - } - // if err = sock.SetOption(mangos.OptionNoDelay, true); err != nil { - // sock.Close() - // return err - // } - if err = sock.SetOption(mangos.OptionRaw, true); err != nil { - sock.Close() - return err - } +func defualtSocketOptions() map[string]interface{} { - return nil + options := make(map[string]interface{}) + + options[mangos.OptionMaxRecvSize] = maxRecvSize + options[mangos.OptionWriteQLen] = 0 + options[mangos.OptionReadQLen] = 0 + options[mangos.OptionRecvDeadline] = time.Second + options[mangos.OptionSendDeadline] = time.Second + options[mangos.OptionRaw] = true + + return options } func setSocketOptions(sock mangos.Socket, args ...interface{}) error { - err := defualtSocketOptions(sock) - if err != nil { - return err - } - if sock.GetProtocol().Number() == mangos.ProtoSub { + options := defualtSocketOptions() + + switch sock.GetProtocol().Number() { + case mangos.ProtoSub: for _, arg := range args { switch arg.(type) { case string: - err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string))) + options[mangos.OptionSubscribe] = []byte(arg.(string)) default: - err = sock.SetOption(mangos.OptionSubscribe, []byte("")) + options[mangos.OptionSubscribe] = []byte("") } } + case mangos.ProtoSurveyor: + for _, arg := range args { + switch arg.(type) { + case int: + surveyorTime = time.Duration(arg.(int)/2) * time.Second + default: + } + options[mangos.OptionSurveyTime] = surveyorTime + } + default: + fmt.Println("no additional args") } + for k, v := range options { + if err := sock.SetOption(k, v); err != nil { + return err + } + } return nil } @@ -188,8 +220,6 @@ } all.AddTransports(s) - // s.AddTransport(ipc.NewTransport()) - // s.AddTransport(tcp.NewTransport()) return s, nil } -- Gitblit v1.8.0