From 36766ab5b68ce7dfb39dff5d6d283ce5c7f4b346 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期五, 17 五月 2019 09:14:51 +0800 Subject: [PATCH] fix crash --- nng.go | 202 ++++++++++++++++++++++++++++---------------------- 1 files changed, 114 insertions(+), 88 deletions(-) diff --git a/nng.go b/nng.go index 00a8dc2..07f2fb5 100644 --- a/nng.go +++ b/nng.go @@ -1,7 +1,6 @@ package deliver import ( - "errors" "fmt" "os" "strings" @@ -23,147 +22,176 @@ // NNG mangos wrap type NNG struct { - sock mangos.Socket + sock mangos.Socket + server bool + mode Mode + url string + + arguments []interface{} } // Send impl interface Diliver func (n *NNG) Send(data []byte) error { + var err error if n.sock == nil { - return errors.New("please init NNG first") + n.sock, err = n.makeNNG(true) + if err != nil { + fmt.Println("create nng producer error") + return err + } } - switch n.sock.GetProtocol().Number() { - case mangos.ProtoSurveyor: - time.Sleep(surveyorTime * 2) - default: - } - if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil { - msg := mangos.NewMessage(len(data)) - msg.Body = data - return n.sock.SendMsg(msg) + if surveyorTime > 0 { + time.Sleep(time.Duration(surveyorTime*2) * time.Second) } - return n.sock.Send(data) + msg := mangos.NewMessage(len(data)) + msg.Body = data + return n.sock.SendMsg(msg) + } // Recv impl interface Diliver func (n *NNG) Recv() ([]byte, error) { + var err error + if n.sock == nil { - return nil, errors.New("please init NNG first") + n.sock, err = n.makeNNG(false) + if err != nil { + fmt.Println("create nng consumer error") + return nil, err + } } - if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil { - msg, err := n.sock.RecvMsg() - return msg.Body, err + + var msg *mangos.Message + if msg, err = n.sock.RecvMsg(); err != nil { + return nil, err } - return n.sock.Recv() + return msg.Body, nil + } -// nngProducer create from deliver Mode -func nngProducer(m Mode, url string, args ...interface{}) *NNG { +// Close impl interface Deliver +func (n *NNG) Close() { + if n.sock != nil { + n.sock.Close() + n.sock = nil + } +} + +func nngServer(m Mode, url string, args ...interface{}) *NNG { rmExistedIpcName(url) - if sock, err := newSocket(protoProducer(m)); err == nil { - if err = setSocketOptions(sock, args); err != nil { - return nil - } - if err = sock.Listen(url); err != nil { - sock.Close() - return nil - } - return &NNG{ - sock, - } - } - return nil + return &NNG{ + server: true, + mode: m, + url: url, + arguments: args, + } } -// nngConsumer create from deliver Mode -func nngConsumer(m Mode, url string, args ...interface{}) *NNG { +func nngClient(m Mode, url string, args ...interface{}) *NNG { - if sock, err := newSocket(protoConsumer(m)); err == nil { - if err = setSocketOptions(sock, args); err != nil { - return nil - } - - if err = sock.Dial(url); err != nil { - sock.Close() - return nil - } - - return &NNG{ - sock, - } + return &NNG{ + server: false, + mode: m, + url: url, + arguments: args, } - return nil +} + +func proto(producer bool, m Mode) protocol { + if producer { + return protoProducer(m) + } + return protoConsumer(m) +} + +func (n *NNG) makeNNG(producer bool) (mangos.Socket, error) { + + var sock mangos.Socket + var err error + if sock, err = newSocket(proto(producer, n.mode)); err != nil { + return nil, err + } + + if err = setSocketOptions(sock, n.arguments...); err != nil { + sock.Close() + sock = nil + } + if n.server { + if err = sock.Listen(n.url); err != nil { + sock.Close() + sock = nil + } + } else { + if err = sock.Dial(n.url); err != nil { + sock.Close() + sock = nil + } + } + return sock, err } // maxRecvSize max recv size var ( maxRecvSize = 33 * 1024 * 1024 - surveyorTime = time.Second / 2 + surveyorTime = -1 ) -func defualtSocketOptions(sock mangos.Socket) error { - var err error - if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil { - sock.Close() - return err - } - if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil { - sock.Close() - return err - } - if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil { - sock.Close() - return err - } - // if err = sock.SetOption(mangos.OptionRecvDeadline, time.Second); err != nil { - // sock.Close() - // return err - // } - // if err = sock.SetOption(mangos.OptionSendDeadline, time.Second); err != nil { - // sock.Close() - // return err - // } - if err = sock.SetOption(mangos.OptionRaw, true); err != nil { - sock.Close() - return err - } +func defualtSocketOptions() map[string]interface{} { - return nil + options := make(map[string]interface{}) + + options[mangos.OptionMaxRecvSize] = maxRecvSize + options[mangos.OptionWriteQLen] = 0 + options[mangos.OptionReadQLen] = 0 + // options[mangos.OptionRecvDeadline] = time.Second + // options[mangos.OptionSendDeadline] = time.Second + options[mangos.OptionRaw] = true + + return options } func setSocketOptions(sock mangos.Socket, args ...interface{}) error { - err := defualtSocketOptions(sock) - if err != nil { - return err - } + options := defualtSocketOptions() + switch sock.GetProtocol().Number() { case mangos.ProtoSub: + topic := "" for _, arg := range args { switch arg.(type) { case string: - err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string))) + topic = arg.(string) default: - err = sock.SetOption(mangos.OptionSubscribe, []byte("")) } } + options[mangos.OptionSubscribe] = []byte(topic) case mangos.ProtoSurveyor: for _, arg := range args { switch arg.(type) { case int: - surveyorTime = time.Duration(arg.(int)/2) * time.Second + if arg.(int) < 2 { + surveyorTime = 1 + } else { + surveyorTime = arg.(int) / 2 + } default: } - err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime) } + options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second + default: - fmt.Println("no additional args") } + for k, v := range options { + if err := sock.SetOption(k, v); err != nil { + return err + } + } return nil } @@ -214,8 +242,6 @@ } all.AddTransports(s) - // s.AddTransport(ipc.NewTransport()) - // s.AddTransport(tcp.NewTransport()) return s, nil } -- Gitblit v1.8.0