From 6887a385c690306a57e15b0ff3bad6138bd051c8 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期四, 16 五月 2019 17:18:45 +0800 Subject: [PATCH] change name --- nng.go | 100 ++++++++++++++++++++++++------------------------- 1 files changed, 49 insertions(+), 51 deletions(-) diff --git a/nng.go b/nng.go index 77619e3..009ef8d 100644 --- a/nng.go +++ b/nng.go @@ -24,6 +24,7 @@ // NNG mangos wrap type NNG struct { sock mangos.Socket + raw bool } // Send impl interface Diliver @@ -32,12 +33,11 @@ return errors.New("please init NNG first") } - switch n.sock.GetProtocol().Number() { - case mangos.ProtoSurveyor: - time.Sleep(surveyorTime * 2) - default: + if surveyorTime > 0 { + time.Sleep(time.Duration(surveyorTime*2) * time.Second) } - if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil { + + if n.raw { msg := mangos.NewMessage(len(data)) msg.Body = data return n.sock.SendMsg(msg) @@ -51,11 +51,13 @@ if n.sock == nil { return nil, errors.New("please init NNG first") } - if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil { - if msg, err := n.sock.RecvMsg(); err == nil { - return msg.Body, err + if n.raw { + var msg *mangos.Message + var err error + if msg, err = n.sock.RecvMsg(); err != nil { + return nil, err } - return nil, err + return msg.Body, nil } return n.sock.Recv() } @@ -64,44 +66,51 @@ func (n *NNG) Close() { if n.sock != nil { n.sock.Close() + n.sock = nil } } -// nngProducer create from deliver Mode -func nngProducer(m Mode, url string, args ...interface{}) *NNG { +func nngServer(m Mode, url string, args ...interface{}) *NNG { rmExistedIpcName(url) if sock, err := newSocket(protoProducer(m)); err == nil { if err = setSocketOptions(sock, args); err != nil { + sock.Close() + sock = nil return nil } if err = sock.Listen(url); err != nil { sock.Close() + sock = nil return nil } return &NNG{ sock, + true, } } return nil } -// nngConsumer create from deliver Mode -func nngConsumer(m Mode, url string, args ...interface{}) *NNG { +func nngClient(m Mode, url string, args ...interface{}) *NNG { if sock, err := newSocket(protoConsumer(m)); err == nil { if err = setSocketOptions(sock, args); err != nil { + sock.Close() + sock = nil return nil } if err = sock.Dial(url); err != nil { sock.Close() + sock = nil return nil } return &NNG{ sock, + true, } } @@ -111,68 +120,59 @@ // maxRecvSize max recv size var ( maxRecvSize = 33 * 1024 * 1024 - surveyorTime = time.Second / 2 + surveyorTime = -1 ) -func defualtSocketOptions(sock mangos.Socket) error { - var err error - if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil { - sock.Close() - return err - } - if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil { - sock.Close() - return err - } - if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil { - sock.Close() - return err - } - if err = sock.SetOption(mangos.OptionRecvDeadline, time.Second); err != nil { - sock.Close() - return err - } - if err = sock.SetOption(mangos.OptionSendDeadline, time.Second); err != nil { - sock.Close() - return err - } - if err = sock.SetOption(mangos.OptionRaw, true); err != nil { - sock.Close() - return err - } +func defualtSocketOptions() map[string]interface{} { - return nil + options := make(map[string]interface{}) + + options[mangos.OptionMaxRecvSize] = maxRecvSize + options[mangos.OptionWriteQLen] = 0 + options[mangos.OptionReadQLen] = 0 + options[mangos.OptionRecvDeadline] = time.Second + options[mangos.OptionSendDeadline] = time.Second + options[mangos.OptionRaw] = true + + return options } func setSocketOptions(sock mangos.Socket, args ...interface{}) error { - err := defualtSocketOptions(sock) - if err != nil { - return err - } + options := defualtSocketOptions() + switch sock.GetProtocol().Number() { case mangos.ProtoSub: for _, arg := range args { switch arg.(type) { case string: - err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string))) + options[mangos.OptionSubscribe] = []byte(arg.(string)) default: - err = sock.SetOption(mangos.OptionSubscribe, []byte("")) + options[mangos.OptionSubscribe] = []byte("") } } case mangos.ProtoSurveyor: for _, arg := range args { switch arg.(type) { case int: - surveyorTime = time.Duration(arg.(int)/2) * time.Second + if arg.(int) < 2 { + surveyorTime = 1 + } else { + surveyorTime = arg.(int) / 2 + } default: } - err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime) + options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second } default: fmt.Println("no additional args") } + for k, v := range options { + if err := sock.SetOption(k, v); err != nil { + return err + } + } return nil } @@ -223,8 +223,6 @@ } all.AddTransports(s) - // s.AddTransport(ipc.NewTransport()) - // s.AddTransport(tcp.NewTransport()) return s, nil } -- Gitblit v1.8.0