From 36766ab5b68ce7dfb39dff5d6d283ce5c7f4b346 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期五, 17 五月 2019 09:14:51 +0800 Subject: [PATCH] fix crash --- nng.go | 204 ++++++++++++++++++++++++++++++++------------------- 1 files changed, 128 insertions(+), 76 deletions(-) diff --git a/nng.go b/nng.go index e9568cd..07f2fb5 100644 --- a/nng.go +++ b/nng.go @@ -1,10 +1,10 @@ package deliver import ( - "errors" "fmt" "os" "strings" + "time" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/bus" @@ -22,122 +22,176 @@ // NNG mangos wrap type NNG struct { - sock mangos.Socket + sock mangos.Socket + server bool + mode Mode + url string + + arguments []interface{} } // Send impl interface Diliver func (n *NNG) Send(data []byte) error { + var err error if n.sock == nil { - return errors.New("please init NNG first") + n.sock, err = n.makeNNG(true) + if err != nil { + fmt.Println("create nng producer error") + return err + } } - if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil { - msg := mangos.NewMessage(len(data)) - msg.Body = data - return n.sock.SendMsg(msg) + if surveyorTime > 0 { + time.Sleep(time.Duration(surveyorTime*2) * time.Second) } - return n.sock.Send(data) + + msg := mangos.NewMessage(len(data)) + msg.Body = data + return n.sock.SendMsg(msg) + } // Recv impl interface Diliver func (n *NNG) Recv() ([]byte, error) { + var err error + if n.sock == nil { - return nil, errors.New("please init NNG first") + n.sock, err = n.makeNNG(false) + if err != nil { + fmt.Println("create nng consumer error") + return nil, err + } } - if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil { - msg, err := n.sock.RecvMsg() - return msg.Body, err + + var msg *mangos.Message + if msg, err = n.sock.RecvMsg(); err != nil { + return nil, err } - return n.sock.Recv() + return msg.Body, nil + } -// NewNNGProducer create from deliver Mode -func NewNNGProducer(m Mode, url string, args ...interface{}) *NNG { +// Close impl interface Deliver +func (n *NNG) Close() { + if n.sock != nil { + n.sock.Close() + n.sock = nil + } +} + +func nngServer(m Mode, url string, args ...interface{}) *NNG { rmExistedIpcName(url) - if sock, err := newSocket(protoProducer(m)); err == nil { - if err = setSocketOptions(sock, args); err != nil { - return nil - } - if err = sock.Listen(url); err != nil { - sock.Close() - return nil - } - return &NNG{ - sock, - } - } - return nil + return &NNG{ + server: true, + mode: m, + url: url, + arguments: args, + } } -// NewNNGConsumer create from deliver Mode -func NewNNGConsumer(m Mode, url string, args ...interface{}) *NNG { +func nngClient(m Mode, url string, args ...interface{}) *NNG { - if sock, err := newSocket(protoConsumer(m)); err == nil { - if err = setSocketOptions(sock, args); err != nil { - return nil - } - - if err = sock.Dial(url); err != nil { - sock.Close() - return nil - } - - return &NNG{ - sock, - } + return &NNG{ + server: false, + mode: m, + url: url, + arguments: args, } - return nil } -// MaxRecvSize max recv size -var MaxRecvSize = 33 * 1024 * 1024 +func proto(producer bool, m Mode) protocol { + if producer { + return protoProducer(m) + } + return protoConsumer(m) +} -func defualtSocketOptions(sock mangos.Socket) error { +func (n *NNG) makeNNG(producer bool) (mangos.Socket, error) { + + var sock mangos.Socket var err error - if err = sock.SetOption(mangos.OptionMaxRecvSize, MaxRecvSize); err != nil { - sock.Close() - return err - } - if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil { - sock.Close() - return err - } - if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil { - sock.Close() - return err - } - // if err = sock.SetOption(mangos.OptionNoDelay, true); err != nil { - // sock.Close() - // return err - // } - if err = sock.SetOption(mangos.OptionRaw, true); err != nil { - sock.Close() - return err + if sock, err = newSocket(proto(producer, n.mode)); err != nil { + return nil, err } - return nil + if err = setSocketOptions(sock, n.arguments...); err != nil { + sock.Close() + sock = nil + } + if n.server { + if err = sock.Listen(n.url); err != nil { + sock.Close() + sock = nil + } + } else { + if err = sock.Dial(n.url); err != nil { + sock.Close() + sock = nil + } + } + return sock, err +} + +// maxRecvSize max recv size +var ( + maxRecvSize = 33 * 1024 * 1024 + surveyorTime = -1 +) + +func defualtSocketOptions() map[string]interface{} { + + options := make(map[string]interface{}) + + options[mangos.OptionMaxRecvSize] = maxRecvSize + options[mangos.OptionWriteQLen] = 0 + options[mangos.OptionReadQLen] = 0 + // options[mangos.OptionRecvDeadline] = time.Second + // options[mangos.OptionSendDeadline] = time.Second + options[mangos.OptionRaw] = true + + return options } func setSocketOptions(sock mangos.Socket, args ...interface{}) error { - err := defualtSocketOptions(sock) - if err != nil { - return err - } - if sock.GetProtocol().Number() == mangos.ProtoSub { + options := defualtSocketOptions() + + switch sock.GetProtocol().Number() { + case mangos.ProtoSub: + topic := "" for _, arg := range args { switch arg.(type) { case string: - err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string))) + topic = arg.(string) default: - err = sock.SetOption(mangos.OptionSubscribe, []byte("")) } } + options[mangos.OptionSubscribe] = []byte(topic) + case mangos.ProtoSurveyor: + for _, arg := range args { + switch arg.(type) { + case int: + if arg.(int) < 2 { + surveyorTime = 1 + } else { + surveyorTime = arg.(int) / 2 + } + default: + } + } + options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second + + default: } + for k, v := range options { + if err := sock.SetOption(k, v); err != nil { + return err + } + } return nil } @@ -188,8 +242,6 @@ } all.AddTransports(s) - // s.AddTransport(ipc.NewTransport()) - // s.AddTransport(tcp.NewTransport()) return s, nil } -- Gitblit v1.8.0