From d23f54e337d12fb4e6d5a0a5e1f041a51005e10c Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期五, 17 五月 2019 18:08:06 +0800
Subject: [PATCH] 整理代码

---
 nng.go     |  143 +++++++--------------------
 nngmake.go |   74 ++++++++++++++
 nngopt.go  |   71 ++++++++++++++
 3 files changed, 184 insertions(+), 104 deletions(-)

diff --git a/nng.go b/nng.go
index a61d3d2..1e9c1c0 100644
--- a/nng.go
+++ b/nng.go
@@ -1,6 +1,7 @@
 package deliver
 
 import (
+	"errors"
 	"fmt"
 	"os"
 	"strings"
@@ -24,9 +25,8 @@
 type td int
 
 const (
-	producer = td(iota)
-	consumer
-	star //mangos bus protocol
+	agent = td(iota)
+	coactee
 )
 
 // NNG mangos wrap
@@ -43,9 +43,9 @@
 func (n *NNG) Send(data []byte) error {
 	var err error
 	if n.sock == nil {
-		n.sock, err = n.makeNNG(producer)
+		n.sock, err = n.makeNNG(agent)
 		if err != nil {
-			fmt.Println("create nng producer error")
+			fmt.Println("create nng sender error")
 			return err
 		}
 	}
@@ -65,9 +65,9 @@
 	var err error
 
 	if n.sock == nil {
-		n.sock, err = n.makeNNG(consumer)
+		n.sock, err = n.makeNNG(coactee)
 		if err != nil {
-			fmt.Println("create nng consumer error")
+			fmt.Println("create nng reciever error")
 			return nil, err
 		}
 	}
@@ -112,98 +112,29 @@
 }
 
 func proto(typ td, m Mode) protocol {
-	if typ == producer {
-		return protoProducer(m)
-	} else if typ == consumer {
-		return protoConsumer(m)
+	if typ == agent {
+		return protoAgent(m)
+	} else if typ == coactee {
+		return protoCoactee(m)
 	}
-	return protoConsumer(m)
+	return NONE
 }
 
 func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
 
 	var sock mangos.Socket
 	var err error
-	if sock, err = newSocket(proto(typ, n.mode)); err != nil {
-		return nil, err
-	}
 
-	if err = setSocketOptions(sock, n.arguments...); err != nil {
-		sock.Close()
-		sock = nil
-	}
-	if n.server {
-		if err = sock.Listen(n.url); err != nil {
-			sock.Close()
-			sock = nil
-		}
-	} else {
-		if err = sock.Dial(n.url); err != nil {
-			sock.Close()
-			sock = nil
-		}
-	}
-	return sock, err
-}
-
-// maxRecvSize max recv size
-var (
-	maxRecvSize  = 33 * 1024 * 1024
-	surveyorTime = -1
-)
-
-func defualtSocketOptions() map[string]interface{} {
-
-	options := make(map[string]interface{})
-
-	options[mangos.OptionMaxRecvSize] = maxRecvSize
-	options[mangos.OptionWriteQLen] = 0
-	options[mangos.OptionReadQLen] = 0
-	options[mangos.OptionRecvDeadline] = time.Second
-	options[mangos.OptionSendDeadline] = time.Second
-	options[mangos.OptionRaw] = true
-
-	return options
-}
-
-func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
-
-	options := defualtSocketOptions()
-
-	switch sock.GetProtocol().Number() {
-	case mangos.ProtoSub:
-		topic := ""
-		for _, arg := range args {
-			switch arg.(type) {
-			case string:
-				topic = arg.(string)
-			default:
-			}
-		}
-		options[mangos.OptionSubscribe] = []byte(topic)
-	case mangos.ProtoSurveyor:
-		for _, arg := range args {
-			switch arg.(type) {
-			case int:
-				if arg.(int) < 2 {
-					surveyorTime = 1
-				} else {
-					surveyorTime = arg.(int) / 2
-				}
-			default:
-			}
-		}
-		options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second
-
+	switch n.mode {
+	case Bus:
+		sock, err = n.busMakeNNG(typ)
+	case ReqRep:
+		sock, err = n.rrMakeNNG(typ)
 	default:
+		sock, err = n.ppMakeNNG(typ)
 	}
 
-	for k, v := range options {
-		if err := sock.SetOption(k, v); err != nil {
-			return err
-		}
-	}
-	return nil
+	return sock, err
 }
 
 func rmExistedIpcName(url string) {
@@ -220,6 +151,9 @@
 // access the underlying library.
 func newSocket(p protocol) (mangos.Socket, error) {
 
+	if p == NONE {
+		return nil, errors.New("new socket protocol none")
+	}
 	var s mangos.Socket
 	var err error
 
@@ -268,6 +202,7 @@
 
 // Constants for protocols.
 const (
+	NONE       = -1
 	PUSH       = protocol(mangos.ProtoPush)
 	PULL       = protocol(mangos.ProtoPull)
 	PUB        = protocol(mangos.ProtoPub)
@@ -280,38 +215,38 @@
 	PAIR       = protocol(mangos.ProtoPair)
 )
 
-func protoProducer(m Mode) protocol {
+func protoAgent(m Mode) protocol {
 	switch m {
 	case PushPull:
 		return PUSH
 	case PubSub:
 		return PUB
-	case ReqRep:
-		return REP
-	case SurvResp:
-		return SURVEYOR
-	case Bus:
-		return BUS
 	case Pair:
 		return PAIR
+	case SurvResp:
+		return SURVEYOR
+	case ReqRep:
+		return REQ
+	case Bus:
+		return BUS
 	}
-	return PUSH
+	return NONE
 }
 
-func protoConsumer(m Mode) protocol {
+func protoCoactee(m Mode) protocol {
 	switch m {
 	case PushPull:
 		return PULL
 	case PubSub:
 		return SUB
-	case ReqRep:
-		return REQ
-	case SurvResp:
-		return RESPONDENT
-	case Bus:
-		return BUS
 	case Pair:
 		return PAIR
+	case SurvResp:
+		return RESPONDENT
+	case ReqRep:
+		return REP
+	case Bus:
+		return BUS
 	}
-	return PULL
+	return NONE
 }
diff --git a/nngmake.go b/nngmake.go
new file mode 100644
index 0000000..cdce421
--- /dev/null
+++ b/nngmake.go
@@ -0,0 +1,74 @@
+package deliver
+
+import "nanomsg.org/go-mangos"
+
+func commonMake(n *NNG, p protocol, opts map[string]interface{}) (mangos.Socket, error) {
+	var sock mangos.Socket
+	var err error
+
+	if sock, err = newSocket(p); err != nil {
+		return nil, err
+	}
+
+	if err = setSocketOptions(sock, opts); err != nil {
+		sock.Close()
+		sock = nil
+	}
+	if n.server {
+		if err = sock.Listen(n.url); err != nil {
+			sock.Close()
+			sock = nil
+		}
+	} else {
+		if err = sock.Dial(n.url); err != nil {
+			sock.Close()
+			sock = nil
+		}
+	}
+	return sock, err
+}
+
+func (n *NNG) ppMakeNNG(typ td) (mangos.Socket, error) {
+
+	p := proto(typ, n.mode)
+	opts := optOther(p, optRaw(optDefault()), n.arguments...)
+
+	return commonMake(n, p, opts)
+}
+
+func (n *NNG) rrMakeNNG(typ td) (mangos.Socket, error) {
+
+	p := proto(typ, n.mode)
+	opts := optDefault()
+
+	return commonMake(n, p, opts)
+}
+
+func (n *NNG) busMakeNNG(typ td) (mangos.Socket, error) {
+	var sock mangos.Socket
+	var err error
+	if sock, err = newSocket(BUS); err != nil {
+		return nil, err
+	}
+	if err = sock.Listen(n.url); err != nil {
+		sock.Close()
+		sock = nil
+	}
+	// arguments are bus clients
+	for _, arg := range n.arguments {
+		switch arg.(type) {
+		case string:
+			url := arg.(string)
+			if err = sock.Dial(url); err != nil {
+				sock.Close()
+				sock = nil
+				break
+			}
+		default:
+		}
+	}
+	if err = setSocketOptions(sock, nil); err != nil {
+		return nil, err
+	}
+	return sock, err
+}
diff --git a/nngopt.go b/nngopt.go
new file mode 100644
index 0000000..0fda3c7
--- /dev/null
+++ b/nngopt.go
@@ -0,0 +1,71 @@
+package deliver
+
+import (
+	"time"
+
+	"nanomsg.org/go-mangos"
+)
+
+// maxRecvSize max recv size
+var (
+	maxRecvSize  = 33 * 1024 * 1024
+	surveyorTime = -1
+)
+
+func optDefault() map[string]interface{} {
+	options := make(map[string]interface{})
+
+	options[mangos.OptionMaxRecvSize] = maxRecvSize
+	options[mangos.OptionWriteQLen] = 0
+	options[mangos.OptionReadQLen] = 0
+	options[mangos.OptionRecvDeadline] = time.Second
+	options[mangos.OptionSendDeadline] = time.Second
+	// options[mangos.OptionRaw] = true
+
+	return options
+}
+
+func optRaw(options map[string]interface{}) map[string]interface{} {
+
+	options[mangos.OptionRaw] = true
+	return options
+}
+
+func optOther(p protocol, options map[string]interface{}, o ...interface{}) map[string]interface{} {
+	// len 1 or 0, sub topic
+	if p == SUB {
+		if len(o) == 1 {
+			v := o[0]
+			if t, ok := v.(string); ok {
+				options[mangos.OptionSubscribe] = []byte(t)
+			}
+		} else {
+			options[mangos.OptionSubscribe] = []byte("")
+		}
+	}
+
+	// len 1 or 0, surveyor time
+	if p == SURVEYOR {
+		if len(o) == 1 {
+			v := o[0]
+			if t, ok := v.(int); ok {
+				surveyorTime = t / 2
+				if surveyorTime == 0 {
+					surveyorTime = 1
+				}
+			}
+		}
+		options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second
+	}
+	return options
+}
+
+func setSocketOptions(sock mangos.Socket, options map[string]interface{}) error {
+
+	for k, v := range options {
+		if err := sock.SetOption(k, v); err != nil {
+			return err
+		}
+	}
+	return nil
+}

--
Gitblit v1.8.0