From 36766ab5b68ce7dfb39dff5d6d283ce5c7f4b346 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期五, 17 五月 2019 09:14:51 +0800 Subject: [PATCH] fix crash --- nng.go | 152 +++++++++++++++++++++++++++++--------------------- 1 files changed, 89 insertions(+), 63 deletions(-) diff --git a/nng.go b/nng.go index 242409a..07f2fb5 100644 --- a/nng.go +++ b/nng.go @@ -1,7 +1,6 @@ package deliver import ( - "errors" "fmt" "os" "strings" @@ -23,44 +22,53 @@ // NNG mangos wrap type NNG struct { - sock mangos.Socket - raw bool + sock mangos.Socket + server bool + mode Mode + url string + + arguments []interface{} } // Send impl interface Diliver func (n *NNG) Send(data []byte) error { + var err error if n.sock == nil { - return errors.New("please init NNG first") + n.sock, err = n.makeNNG(true) + if err != nil { + fmt.Println("create nng producer error") + return err + } } - // switch n.sock.GetProtocol().Number() { - // case mangos.ProtoSurveyor: - // time.Sleep(surveyorTime * 2) - // default: - // } - if n.raw { - msg := mangos.NewMessage(len(data)) - msg.Body = data - return n.sock.SendMsg(msg) + if surveyorTime > 0 { + time.Sleep(time.Duration(surveyorTime*2) * time.Second) } - return n.sock.Send(data) + msg := mangos.NewMessage(len(data)) + msg.Body = data + return n.sock.SendMsg(msg) + } // Recv impl interface Diliver func (n *NNG) Recv() ([]byte, error) { + var err error + if n.sock == nil { - return nil, errors.New("please init NNG first") - } - if n.raw { - var msg *mangos.Message - var err error - if msg, err = n.sock.RecvMsg(); err != nil { + n.sock, err = n.makeNNG(false) + if err != nil { + fmt.Println("create nng consumer error") return nil, err } - return msg.Body, nil } - return n.sock.Recv() + + var msg *mangos.Message + if msg, err = n.sock.RecvMsg(); err != nil { + return nil, err + } + return msg.Body, nil + } // Close impl interface Deliver @@ -71,53 +79,66 @@ } } -func nngListener(m Mode, url string, args ...interface{}) *NNG { +func nngServer(m Mode, url string, args ...interface{}) *NNG { rmExistedIpcName(url) - if sock, err := newSocket(protoProducer(m)); err == nil { - if err = setSocketOptions(sock, args); err != nil { - return nil - } - if err = sock.Listen(url); err != nil { - sock.Close() - sock = nil - return nil - } - return &NNG{ - sock, - true, - } - } - return nil + return &NNG{ + server: true, + mode: m, + url: url, + arguments: args, + } } -func nngDialer(m Mode, url string, args ...interface{}) *NNG { +func nngClient(m Mode, url string, args ...interface{}) *NNG { - if sock, err := newSocket(protoConsumer(m)); err == nil { - if err = setSocketOptions(sock, args); err != nil { - return nil - } - - if err = sock.Dial(url); err != nil { - sock.Close() - sock = nil - return nil - } - - return &NNG{ - sock, - true, - } + return &NNG{ + server: false, + mode: m, + url: url, + arguments: args, } - return nil +} + +func proto(producer bool, m Mode) protocol { + if producer { + return protoProducer(m) + } + return protoConsumer(m) +} + +func (n *NNG) makeNNG(producer bool) (mangos.Socket, error) { + + var sock mangos.Socket + var err error + if sock, err = newSocket(proto(producer, n.mode)); err != nil { + return nil, err + } + + if err = setSocketOptions(sock, n.arguments...); err != nil { + sock.Close() + sock = nil + } + if n.server { + if err = sock.Listen(n.url); err != nil { + sock.Close() + sock = nil + } + } else { + if err = sock.Dial(n.url); err != nil { + sock.Close() + sock = nil + } + } + return sock, err } // maxRecvSize max recv size var ( maxRecvSize = 33 * 1024 * 1024 - surveyorTime = time.Second / 2 + surveyorTime = -1 ) func defualtSocketOptions() map[string]interface{} { @@ -127,8 +148,8 @@ options[mangos.OptionMaxRecvSize] = maxRecvSize options[mangos.OptionWriteQLen] = 0 options[mangos.OptionReadQLen] = 0 - options[mangos.OptionRecvDeadline] = time.Second - options[mangos.OptionSendDeadline] = time.Second + // options[mangos.OptionRecvDeadline] = time.Second + // options[mangos.OptionSendDeadline] = time.Second options[mangos.OptionRaw] = true return options @@ -140,25 +161,30 @@ switch sock.GetProtocol().Number() { case mangos.ProtoSub: + topic := "" for _, arg := range args { switch arg.(type) { case string: - options[mangos.OptionSubscribe] = []byte(arg.(string)) + topic = arg.(string) default: - options[mangos.OptionSubscribe] = []byte("") } } + options[mangos.OptionSubscribe] = []byte(topic) case mangos.ProtoSurveyor: for _, arg := range args { switch arg.(type) { case int: - surveyorTime = time.Duration(arg.(int)/2) * time.Second + if arg.(int) < 2 { + surveyorTime = 1 + } else { + surveyorTime = arg.(int) / 2 + } default: } - options[mangos.OptionSurveyTime] = surveyorTime } + options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second + default: - fmt.Println("no additional args") } for k, v := range options { -- Gitblit v1.8.0