From d23f54e337d12fb4e6d5a0a5e1f041a51005e10c Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期五, 17 五月 2019 18:08:06 +0800
Subject: [PATCH] 整理代码
---
nng.go | 143 +++++++--------------------
nngmake.go | 74 ++++++++++++++
nngopt.go | 71 ++++++++++++++
3 files changed, 184 insertions(+), 104 deletions(-)
diff --git a/nng.go b/nng.go
index a61d3d2..1e9c1c0 100644
--- a/nng.go
+++ b/nng.go
@@ -1,6 +1,7 @@
package deliver
import (
+ "errors"
"fmt"
"os"
"strings"
@@ -24,9 +25,8 @@
type td int
const (
- producer = td(iota)
- consumer
- star //mangos bus protocol
+ agent = td(iota)
+ coactee
)
// NNG mangos wrap
@@ -43,9 +43,9 @@
func (n *NNG) Send(data []byte) error {
var err error
if n.sock == nil {
- n.sock, err = n.makeNNG(producer)
+ n.sock, err = n.makeNNG(agent)
if err != nil {
- fmt.Println("create nng producer error")
+ fmt.Println("create nng sender error")
return err
}
}
@@ -65,9 +65,9 @@
var err error
if n.sock == nil {
- n.sock, err = n.makeNNG(consumer)
+ n.sock, err = n.makeNNG(coactee)
if err != nil {
- fmt.Println("create nng consumer error")
+ fmt.Println("create nng reciever error")
return nil, err
}
}
@@ -112,98 +112,29 @@
}
func proto(typ td, m Mode) protocol {
- if typ == producer {
- return protoProducer(m)
- } else if typ == consumer {
- return protoConsumer(m)
+ if typ == agent {
+ return protoAgent(m)
+ } else if typ == coactee {
+ return protoCoactee(m)
}
- return protoConsumer(m)
+ return NONE
}
func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
var sock mangos.Socket
var err error
- if sock, err = newSocket(proto(typ, n.mode)); err != nil {
- return nil, err
- }
- if err = setSocketOptions(sock, n.arguments...); err != nil {
- sock.Close()
- sock = nil
- }
- if n.server {
- if err = sock.Listen(n.url); err != nil {
- sock.Close()
- sock = nil
- }
- } else {
- if err = sock.Dial(n.url); err != nil {
- sock.Close()
- sock = nil
- }
- }
- return sock, err
-}
-
-// maxRecvSize max recv size
-var (
- maxRecvSize = 33 * 1024 * 1024
- surveyorTime = -1
-)
-
-func defualtSocketOptions() map[string]interface{} {
-
- options := make(map[string]interface{})
-
- options[mangos.OptionMaxRecvSize] = maxRecvSize
- options[mangos.OptionWriteQLen] = 0
- options[mangos.OptionReadQLen] = 0
- options[mangos.OptionRecvDeadline] = time.Second
- options[mangos.OptionSendDeadline] = time.Second
- options[mangos.OptionRaw] = true
-
- return options
-}
-
-func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
-
- options := defualtSocketOptions()
-
- switch sock.GetProtocol().Number() {
- case mangos.ProtoSub:
- topic := ""
- for _, arg := range args {
- switch arg.(type) {
- case string:
- topic = arg.(string)
- default:
- }
- }
- options[mangos.OptionSubscribe] = []byte(topic)
- case mangos.ProtoSurveyor:
- for _, arg := range args {
- switch arg.(type) {
- case int:
- if arg.(int) < 2 {
- surveyorTime = 1
- } else {
- surveyorTime = arg.(int) / 2
- }
- default:
- }
- }
- options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second
-
+ switch n.mode {
+ case Bus:
+ sock, err = n.busMakeNNG(typ)
+ case ReqRep:
+ sock, err = n.rrMakeNNG(typ)
default:
+ sock, err = n.ppMakeNNG(typ)
}
- for k, v := range options {
- if err := sock.SetOption(k, v); err != nil {
- return err
- }
- }
- return nil
+ return sock, err
}
func rmExistedIpcName(url string) {
@@ -220,6 +151,9 @@
// access the underlying library.
func newSocket(p protocol) (mangos.Socket, error) {
+ if p == NONE {
+ return nil, errors.New("new socket protocol none")
+ }
var s mangos.Socket
var err error
@@ -268,6 +202,7 @@
// Constants for protocols.
const (
+ NONE = -1
PUSH = protocol(mangos.ProtoPush)
PULL = protocol(mangos.ProtoPull)
PUB = protocol(mangos.ProtoPub)
@@ -280,38 +215,38 @@
PAIR = protocol(mangos.ProtoPair)
)
-func protoProducer(m Mode) protocol {
+func protoAgent(m Mode) protocol {
switch m {
case PushPull:
return PUSH
case PubSub:
return PUB
- case ReqRep:
- return REP
- case SurvResp:
- return SURVEYOR
- case Bus:
- return BUS
case Pair:
return PAIR
+ case SurvResp:
+ return SURVEYOR
+ case ReqRep:
+ return REQ
+ case Bus:
+ return BUS
}
- return PUSH
+ return NONE
}
-func protoConsumer(m Mode) protocol {
+func protoCoactee(m Mode) protocol {
switch m {
case PushPull:
return PULL
case PubSub:
return SUB
- case ReqRep:
- return REQ
- case SurvResp:
- return RESPONDENT
- case Bus:
- return BUS
case Pair:
return PAIR
+ case SurvResp:
+ return RESPONDENT
+ case ReqRep:
+ return REP
+ case Bus:
+ return BUS
}
- return PULL
+ return NONE
}
diff --git a/nngmake.go b/nngmake.go
new file mode 100644
index 0000000..cdce421
--- /dev/null
+++ b/nngmake.go
@@ -0,0 +1,74 @@
+package deliver
+
+import "nanomsg.org/go-mangos"
+
+func commonMake(n *NNG, p protocol, opts map[string]interface{}) (mangos.Socket, error) {
+ var sock mangos.Socket
+ var err error
+
+ if sock, err = newSocket(p); err != nil {
+ return nil, err
+ }
+
+ if err = setSocketOptions(sock, opts); err != nil {
+ sock.Close()
+ sock = nil
+ }
+ if n.server {
+ if err = sock.Listen(n.url); err != nil {
+ sock.Close()
+ sock = nil
+ }
+ } else {
+ if err = sock.Dial(n.url); err != nil {
+ sock.Close()
+ sock = nil
+ }
+ }
+ return sock, err
+}
+
+func (n *NNG) ppMakeNNG(typ td) (mangos.Socket, error) {
+
+ p := proto(typ, n.mode)
+ opts := optOther(p, optRaw(optDefault()), n.arguments...)
+
+ return commonMake(n, p, opts)
+}
+
+func (n *NNG) rrMakeNNG(typ td) (mangos.Socket, error) {
+
+ p := proto(typ, n.mode)
+ opts := optDefault()
+
+ return commonMake(n, p, opts)
+}
+
+func (n *NNG) busMakeNNG(typ td) (mangos.Socket, error) {
+ var sock mangos.Socket
+ var err error
+ if sock, err = newSocket(BUS); err != nil {
+ return nil, err
+ }
+ if err = sock.Listen(n.url); err != nil {
+ sock.Close()
+ sock = nil
+ }
+ // arguments are bus clients
+ for _, arg := range n.arguments {
+ switch arg.(type) {
+ case string:
+ url := arg.(string)
+ if err = sock.Dial(url); err != nil {
+ sock.Close()
+ sock = nil
+ break
+ }
+ default:
+ }
+ }
+ if err = setSocketOptions(sock, nil); err != nil {
+ return nil, err
+ }
+ return sock, err
+}
diff --git a/nngopt.go b/nngopt.go
new file mode 100644
index 0000000..0fda3c7
--- /dev/null
+++ b/nngopt.go
@@ -0,0 +1,71 @@
+package deliver
+
+import (
+ "time"
+
+ "nanomsg.org/go-mangos"
+)
+
+// maxRecvSize max recv size
+var (
+ maxRecvSize = 33 * 1024 * 1024
+ surveyorTime = -1
+)
+
+func optDefault() map[string]interface{} {
+ options := make(map[string]interface{})
+
+ options[mangos.OptionMaxRecvSize] = maxRecvSize
+ options[mangos.OptionWriteQLen] = 0
+ options[mangos.OptionReadQLen] = 0
+ options[mangos.OptionRecvDeadline] = time.Second
+ options[mangos.OptionSendDeadline] = time.Second
+ // options[mangos.OptionRaw] = true
+
+ return options
+}
+
+func optRaw(options map[string]interface{}) map[string]interface{} {
+
+ options[mangos.OptionRaw] = true
+ return options
+}
+
+func optOther(p protocol, options map[string]interface{}, o ...interface{}) map[string]interface{} {
+ // len 1 or 0, sub topic
+ if p == SUB {
+ if len(o) == 1 {
+ v := o[0]
+ if t, ok := v.(string); ok {
+ options[mangos.OptionSubscribe] = []byte(t)
+ }
+ } else {
+ options[mangos.OptionSubscribe] = []byte("")
+ }
+ }
+
+ // len 1 or 0, surveyor time
+ if p == SURVEYOR {
+ if len(o) == 1 {
+ v := o[0]
+ if t, ok := v.(int); ok {
+ surveyorTime = t / 2
+ if surveyorTime == 0 {
+ surveyorTime = 1
+ }
+ }
+ }
+ options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second
+ }
+ return options
+}
+
+func setSocketOptions(sock mangos.Socket, options map[string]interface{}) error {
+
+ for k, v := range options {
+ if err := sock.SetOption(k, v); err != nil {
+ return err
+ }
+ }
+ return nil
+}
--
Gitblit v1.8.0