From d23f54e337d12fb4e6d5a0a5e1f041a51005e10c Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期五, 17 五月 2019 18:08:06 +0800 Subject: [PATCH] 整理代码 --- nng.go | 143 +++++++++++++---------------------------------- 1 files changed, 39 insertions(+), 104 deletions(-) diff --git a/nng.go b/nng.go index a61d3d2..1e9c1c0 100644 --- a/nng.go +++ b/nng.go @@ -1,6 +1,7 @@ package deliver import ( + "errors" "fmt" "os" "strings" @@ -24,9 +25,8 @@ type td int const ( - producer = td(iota) - consumer - star //mangos bus protocol + agent = td(iota) + coactee ) // NNG mangos wrap @@ -43,9 +43,9 @@ func (n *NNG) Send(data []byte) error { var err error if n.sock == nil { - n.sock, err = n.makeNNG(producer) + n.sock, err = n.makeNNG(agent) if err != nil { - fmt.Println("create nng producer error") + fmt.Println("create nng sender error") return err } } @@ -65,9 +65,9 @@ var err error if n.sock == nil { - n.sock, err = n.makeNNG(consumer) + n.sock, err = n.makeNNG(coactee) if err != nil { - fmt.Println("create nng consumer error") + fmt.Println("create nng reciever error") return nil, err } } @@ -112,98 +112,29 @@ } func proto(typ td, m Mode) protocol { - if typ == producer { - return protoProducer(m) - } else if typ == consumer { - return protoConsumer(m) + if typ == agent { + return protoAgent(m) + } else if typ == coactee { + return protoCoactee(m) } - return protoConsumer(m) + return NONE } func (n *NNG) makeNNG(typ td) (mangos.Socket, error) { var sock mangos.Socket var err error - if sock, err = newSocket(proto(typ, n.mode)); err != nil { - return nil, err - } - if err = setSocketOptions(sock, n.arguments...); err != nil { - sock.Close() - sock = nil - } - if n.server { - if err = sock.Listen(n.url); err != nil { - sock.Close() - sock = nil - } - } else { - if err = sock.Dial(n.url); err != nil { - sock.Close() - sock = nil - } - } - return sock, err -} - -// maxRecvSize max recv size -var ( - maxRecvSize = 33 * 1024 * 1024 - surveyorTime = -1 -) - -func defualtSocketOptions() map[string]interface{} { - - options := make(map[string]interface{}) - - options[mangos.OptionMaxRecvSize] = maxRecvSize - options[mangos.OptionWriteQLen] = 0 - options[mangos.OptionReadQLen] = 0 - options[mangos.OptionRecvDeadline] = time.Second - options[mangos.OptionSendDeadline] = time.Second - options[mangos.OptionRaw] = true - - return options -} - -func setSocketOptions(sock mangos.Socket, args ...interface{}) error { - - options := defualtSocketOptions() - - switch sock.GetProtocol().Number() { - case mangos.ProtoSub: - topic := "" - for _, arg := range args { - switch arg.(type) { - case string: - topic = arg.(string) - default: - } - } - options[mangos.OptionSubscribe] = []byte(topic) - case mangos.ProtoSurveyor: - for _, arg := range args { - switch arg.(type) { - case int: - if arg.(int) < 2 { - surveyorTime = 1 - } else { - surveyorTime = arg.(int) / 2 - } - default: - } - } - options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second - + switch n.mode { + case Bus: + sock, err = n.busMakeNNG(typ) + case ReqRep: + sock, err = n.rrMakeNNG(typ) default: + sock, err = n.ppMakeNNG(typ) } - for k, v := range options { - if err := sock.SetOption(k, v); err != nil { - return err - } - } - return nil + return sock, err } func rmExistedIpcName(url string) { @@ -220,6 +151,9 @@ // access the underlying library. func newSocket(p protocol) (mangos.Socket, error) { + if p == NONE { + return nil, errors.New("new socket protocol none") + } var s mangos.Socket var err error @@ -268,6 +202,7 @@ // Constants for protocols. const ( + NONE = -1 PUSH = protocol(mangos.ProtoPush) PULL = protocol(mangos.ProtoPull) PUB = protocol(mangos.ProtoPub) @@ -280,38 +215,38 @@ PAIR = protocol(mangos.ProtoPair) ) -func protoProducer(m Mode) protocol { +func protoAgent(m Mode) protocol { switch m { case PushPull: return PUSH case PubSub: return PUB - case ReqRep: - return REP - case SurvResp: - return SURVEYOR - case Bus: - return BUS case Pair: return PAIR + case SurvResp: + return SURVEYOR + case ReqRep: + return REQ + case Bus: + return BUS } - return PUSH + return NONE } -func protoConsumer(m Mode) protocol { +func protoCoactee(m Mode) protocol { switch m { case PushPull: return PULL case PubSub: return SUB - case ReqRep: - return REQ - case SurvResp: - return RESPONDENT - case Bus: - return BUS case Pair: return PAIR + case SurvResp: + return RESPONDENT + case ReqRep: + return REP + case Bus: + return BUS } - return PULL + return NONE } -- Gitblit v1.8.0