From 5ff1f32410e0697e581f6c389c8c22c5abd474c0 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期四, 16 五月 2019 15:17:32 +0800 Subject: [PATCH] fix --- nng.go | 64 ++++++++++++++++++++++++++------ 1 files changed, 52 insertions(+), 12 deletions(-) diff --git a/nng.go b/nng.go index 84d3699..6cf8b52 100644 --- a/nng.go +++ b/nng.go @@ -5,6 +5,7 @@ "fmt" "os" "strings" + "time" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/bus" @@ -23,6 +24,7 @@ // NNG mangos wrap type NNG struct { sock mangos.Socket + raw bool } // Send impl interface Diliver @@ -31,11 +33,17 @@ return errors.New("please init NNG first") } - if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil { + // switch n.sock.GetProtocol().Number() { + // case mangos.ProtoSurveyor: + // time.Sleep(surveyorTime * 2) + // default: + // } + if n.raw { msg := mangos.NewMessage(len(data)) msg.Body = data return n.sock.SendMsg(msg) } + return n.sock.Send(data) } @@ -44,11 +52,22 @@ if n.sock == nil { return nil, errors.New("please init NNG first") } - if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil { - msg, err := n.sock.RecvMsg() - return msg.Body, err + if n.raw { + var msg *mangos.Message + var err error + if msg, err = n.sock.RecvMsg(); err != nil { + return nil, err + } + return msg.Body, nil } return n.sock.Recv() +} + +// Close impl interface Deliver +func (n *NNG) Close() { + if n.sock != nil { + n.sock.Close() + } } // nngProducer create from deliver Mode @@ -65,6 +84,7 @@ } return &NNG{ sock, + true, } } @@ -86,18 +106,22 @@ return &NNG{ sock, + true, } } return nil } -// MaxRecvSize max recv size -var MaxRecvSize = 33 * 1024 * 1024 +// maxRecvSize max recv size +var ( + maxRecvSize = 33 * 1024 * 1024 + surveyorTime = time.Second / 2 +) func defualtSocketOptions(sock mangos.Socket) error { var err error - if err = sock.SetOption(mangos.OptionMaxRecvSize, MaxRecvSize); err != nil { + if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil { sock.Close() return err } @@ -109,10 +133,14 @@ sock.Close() return err } - // if err = sock.SetOption(mangos.OptionNoDelay, true); err != nil { - // sock.Close() - // return err - // } + if err = sock.SetOption(mangos.OptionRecvDeadline, time.Second); err != nil { + sock.Close() + return err + } + if err = sock.SetOption(mangos.OptionSendDeadline, time.Second); err != nil { + sock.Close() + return err + } if err = sock.SetOption(mangos.OptionRaw, true); err != nil { sock.Close() return err @@ -127,7 +155,8 @@ if err != nil { return err } - if sock.GetProtocol().Number() == mangos.ProtoSub { + switch sock.GetProtocol().Number() { + case mangos.ProtoSub: for _, arg := range args { switch arg.(type) { case string: @@ -136,6 +165,17 @@ err = sock.SetOption(mangos.OptionSubscribe, []byte("")) } } + case mangos.ProtoSurveyor: + for _, arg := range args { + switch arg.(type) { + case int: + surveyorTime = time.Duration(arg.(int)/2) * time.Second + default: + } + err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime) + } + default: + fmt.Println("no additional args") } return nil -- Gitblit v1.8.0