From 36766ab5b68ce7dfb39dff5d6d283ce5c7f4b346 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期五, 17 五月 2019 09:14:51 +0800 Subject: [PATCH] fix crash --- nng.go | 137 ++++++++++++++++++++++++++------------------- 1 files changed, 78 insertions(+), 59 deletions(-) diff --git a/nng.go b/nng.go index 009ef8d..07f2fb5 100644 --- a/nng.go +++ b/nng.go @@ -1,7 +1,6 @@ package deliver import ( - "errors" "fmt" "os" "strings" @@ -23,43 +22,53 @@ // NNG mangos wrap type NNG struct { - sock mangos.Socket - raw bool + sock mangos.Socket + server bool + mode Mode + url string + + arguments []interface{} } // Send impl interface Diliver func (n *NNG) Send(data []byte) error { + var err error if n.sock == nil { - return errors.New("please init NNG first") + n.sock, err = n.makeNNG(true) + if err != nil { + fmt.Println("create nng producer error") + return err + } } if surveyorTime > 0 { time.Sleep(time.Duration(surveyorTime*2) * time.Second) } - if n.raw { - msg := mangos.NewMessage(len(data)) - msg.Body = data - return n.sock.SendMsg(msg) - } + msg := mangos.NewMessage(len(data)) + msg.Body = data + return n.sock.SendMsg(msg) - return n.sock.Send(data) } // Recv impl interface Diliver func (n *NNG) Recv() ([]byte, error) { + var err error + if n.sock == nil { - return nil, errors.New("please init NNG first") - } - if n.raw { - var msg *mangos.Message - var err error - if msg, err = n.sock.RecvMsg(); err != nil { + n.sock, err = n.makeNNG(false) + if err != nil { + fmt.Println("create nng consumer error") return nil, err } - return msg.Body, nil } - return n.sock.Recv() + + var msg *mangos.Message + if msg, err = n.sock.RecvMsg(); err != nil { + return nil, err + } + return msg.Body, nil + } // Close impl interface Deliver @@ -73,48 +82,57 @@ func nngServer(m Mode, url string, args ...interface{}) *NNG { rmExistedIpcName(url) - if sock, err := newSocket(protoProducer(m)); err == nil { - if err = setSocketOptions(sock, args); err != nil { - sock.Close() - sock = nil - return nil - } - if err = sock.Listen(url); err != nil { - sock.Close() - sock = nil - return nil - } - return &NNG{ - sock, - true, - } - } - return nil + return &NNG{ + server: true, + mode: m, + url: url, + arguments: args, + } } func nngClient(m Mode, url string, args ...interface{}) *NNG { - if sock, err := newSocket(protoConsumer(m)); err == nil { - if err = setSocketOptions(sock, args); err != nil { - sock.Close() - sock = nil - return nil - } - - if err = sock.Dial(url); err != nil { - sock.Close() - sock = nil - return nil - } - - return &NNG{ - sock, - true, - } + return &NNG{ + server: false, + mode: m, + url: url, + arguments: args, } - return nil +} + +func proto(producer bool, m Mode) protocol { + if producer { + return protoProducer(m) + } + return protoConsumer(m) +} + +func (n *NNG) makeNNG(producer bool) (mangos.Socket, error) { + + var sock mangos.Socket + var err error + if sock, err = newSocket(proto(producer, n.mode)); err != nil { + return nil, err + } + + if err = setSocketOptions(sock, n.arguments...); err != nil { + sock.Close() + sock = nil + } + if n.server { + if err = sock.Listen(n.url); err != nil { + sock.Close() + sock = nil + } + } else { + if err = sock.Dial(n.url); err != nil { + sock.Close() + sock = nil + } + } + return sock, err } // maxRecvSize max recv size @@ -130,8 +148,8 @@ options[mangos.OptionMaxRecvSize] = maxRecvSize options[mangos.OptionWriteQLen] = 0 options[mangos.OptionReadQLen] = 0 - options[mangos.OptionRecvDeadline] = time.Second - options[mangos.OptionSendDeadline] = time.Second + // options[mangos.OptionRecvDeadline] = time.Second + // options[mangos.OptionSendDeadline] = time.Second options[mangos.OptionRaw] = true return options @@ -143,14 +161,15 @@ switch sock.GetProtocol().Number() { case mangos.ProtoSub: + topic := "" for _, arg := range args { switch arg.(type) { case string: - options[mangos.OptionSubscribe] = []byte(arg.(string)) + topic = arg.(string) default: - options[mangos.OptionSubscribe] = []byte("") } } + options[mangos.OptionSubscribe] = []byte(topic) case mangos.ProtoSurveyor: for _, arg := range args { switch arg.(type) { @@ -162,10 +181,10 @@ } default: } - options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second } + options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second + default: - fmt.Println("no additional args") } for k, v := range options { -- Gitblit v1.8.0