zhangmeng
2019-05-17 d23f54e337d12fb4e6d5a0a5e1f041a51005e10c
整理代码
2个文件已添加
1个文件已修改
288 ■■■■■ 已修改文件
nng.go 143 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nngmake.go 74 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nngopt.go 71 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nng.go
@@ -1,6 +1,7 @@
package deliver
import (
    "errors"
    "fmt"
    "os"
    "strings"
@@ -24,9 +25,8 @@
type td int
const (
    producer = td(iota)
    consumer
    star //mangos bus protocol
    agent = td(iota)
    coactee
)
// NNG mangos wrap
@@ -43,9 +43,9 @@
func (n *NNG) Send(data []byte) error {
    var err error
    if n.sock == nil {
        n.sock, err = n.makeNNG(producer)
        n.sock, err = n.makeNNG(agent)
        if err != nil {
            fmt.Println("create nng producer error")
            fmt.Println("create nng sender error")
            return err
        }
    }
@@ -65,9 +65,9 @@
    var err error
    if n.sock == nil {
        n.sock, err = n.makeNNG(consumer)
        n.sock, err = n.makeNNG(coactee)
        if err != nil {
            fmt.Println("create nng consumer error")
            fmt.Println("create nng reciever error")
            return nil, err
        }
    }
@@ -112,98 +112,29 @@
}
func proto(typ td, m Mode) protocol {
    if typ == producer {
        return protoProducer(m)
    } else if typ == consumer {
        return protoConsumer(m)
    if typ == agent {
        return protoAgent(m)
    } else if typ == coactee {
        return protoCoactee(m)
    }
    return protoConsumer(m)
    return NONE
}
func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
    var sock mangos.Socket
    var err error
    if sock, err = newSocket(proto(typ, n.mode)); err != nil {
        return nil, err
    }
    if err = setSocketOptions(sock, n.arguments...); err != nil {
        sock.Close()
        sock = nil
    }
    if n.server {
        if err = sock.Listen(n.url); err != nil {
            sock.Close()
            sock = nil
        }
    } else {
        if err = sock.Dial(n.url); err != nil {
            sock.Close()
            sock = nil
        }
    }
    return sock, err
}
// maxRecvSize max recv size
var (
    maxRecvSize  = 33 * 1024 * 1024
    surveyorTime = -1
)
func defualtSocketOptions() map[string]interface{} {
    options := make(map[string]interface{})
    options[mangos.OptionMaxRecvSize] = maxRecvSize
    options[mangos.OptionWriteQLen] = 0
    options[mangos.OptionReadQLen] = 0
    options[mangos.OptionRecvDeadline] = time.Second
    options[mangos.OptionSendDeadline] = time.Second
    options[mangos.OptionRaw] = true
    return options
}
func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
    options := defualtSocketOptions()
    switch sock.GetProtocol().Number() {
    case mangos.ProtoSub:
        topic := ""
        for _, arg := range args {
            switch arg.(type) {
            case string:
                topic = arg.(string)
            default:
            }
        }
        options[mangos.OptionSubscribe] = []byte(topic)
    case mangos.ProtoSurveyor:
        for _, arg := range args {
            switch arg.(type) {
            case int:
                if arg.(int) < 2 {
                    surveyorTime = 1
                } else {
                    surveyorTime = arg.(int) / 2
                }
            default:
            }
        }
        options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second
    switch n.mode {
    case Bus:
        sock, err = n.busMakeNNG(typ)
    case ReqRep:
        sock, err = n.rrMakeNNG(typ)
    default:
        sock, err = n.ppMakeNNG(typ)
    }
    for k, v := range options {
        if err := sock.SetOption(k, v); err != nil {
            return err
        }
    }
    return nil
    return sock, err
}
func rmExistedIpcName(url string) {
@@ -220,6 +151,9 @@
// access the underlying library.
func newSocket(p protocol) (mangos.Socket, error) {
    if p == NONE {
        return nil, errors.New("new socket protocol none")
    }
    var s mangos.Socket
    var err error
@@ -268,6 +202,7 @@
// Constants for protocols.
const (
    NONE       = -1
    PUSH       = protocol(mangos.ProtoPush)
    PULL       = protocol(mangos.ProtoPull)
    PUB        = protocol(mangos.ProtoPub)
@@ -280,38 +215,38 @@
    PAIR       = protocol(mangos.ProtoPair)
)
func protoProducer(m Mode) protocol {
func protoAgent(m Mode) protocol {
    switch m {
    case PushPull:
        return PUSH
    case PubSub:
        return PUB
    case ReqRep:
        return REP
    case SurvResp:
        return SURVEYOR
    case Bus:
        return BUS
    case Pair:
        return PAIR
    case SurvResp:
        return SURVEYOR
    case ReqRep:
        return REQ
    case Bus:
        return BUS
    }
    return PUSH
    return NONE
}
func protoConsumer(m Mode) protocol {
func protoCoactee(m Mode) protocol {
    switch m {
    case PushPull:
        return PULL
    case PubSub:
        return SUB
    case ReqRep:
        return REQ
    case SurvResp:
        return RESPONDENT
    case Bus:
        return BUS
    case Pair:
        return PAIR
    case SurvResp:
        return RESPONDENT
    case ReqRep:
        return REP
    case Bus:
        return BUS
    }
    return PULL
    return NONE
}
nngmake.go
New file
@@ -0,0 +1,74 @@
package deliver
import "nanomsg.org/go-mangos"
func commonMake(n *NNG, p protocol, opts map[string]interface{}) (mangos.Socket, error) {
    var sock mangos.Socket
    var err error
    if sock, err = newSocket(p); err != nil {
        return nil, err
    }
    if err = setSocketOptions(sock, opts); err != nil {
        sock.Close()
        sock = nil
    }
    if n.server {
        if err = sock.Listen(n.url); err != nil {
            sock.Close()
            sock = nil
        }
    } else {
        if err = sock.Dial(n.url); err != nil {
            sock.Close()
            sock = nil
        }
    }
    return sock, err
}
func (n *NNG) ppMakeNNG(typ td) (mangos.Socket, error) {
    p := proto(typ, n.mode)
    opts := optOther(p, optRaw(optDefault()), n.arguments...)
    return commonMake(n, p, opts)
}
func (n *NNG) rrMakeNNG(typ td) (mangos.Socket, error) {
    p := proto(typ, n.mode)
    opts := optDefault()
    return commonMake(n, p, opts)
}
func (n *NNG) busMakeNNG(typ td) (mangos.Socket, error) {
    var sock mangos.Socket
    var err error
    if sock, err = newSocket(BUS); err != nil {
        return nil, err
    }
    if err = sock.Listen(n.url); err != nil {
        sock.Close()
        sock = nil
    }
    // arguments are bus clients
    for _, arg := range n.arguments {
        switch arg.(type) {
        case string:
            url := arg.(string)
            if err = sock.Dial(url); err != nil {
                sock.Close()
                sock = nil
                break
            }
        default:
        }
    }
    if err = setSocketOptions(sock, nil); err != nil {
        return nil, err
    }
    return sock, err
}
nngopt.go
New file
@@ -0,0 +1,71 @@
package deliver
import (
    "time"
    "nanomsg.org/go-mangos"
)
// maxRecvSize max recv size
var (
    maxRecvSize  = 33 * 1024 * 1024
    surveyorTime = -1
)
func optDefault() map[string]interface{} {
    options := make(map[string]interface{})
    options[mangos.OptionMaxRecvSize] = maxRecvSize
    options[mangos.OptionWriteQLen] = 0
    options[mangos.OptionReadQLen] = 0
    options[mangos.OptionRecvDeadline] = time.Second
    options[mangos.OptionSendDeadline] = time.Second
    // options[mangos.OptionRaw] = true
    return options
}
func optRaw(options map[string]interface{}) map[string]interface{} {
    options[mangos.OptionRaw] = true
    return options
}
func optOther(p protocol, options map[string]interface{}, o ...interface{}) map[string]interface{} {
    // len 1 or 0, sub topic
    if p == SUB {
        if len(o) == 1 {
            v := o[0]
            if t, ok := v.(string); ok {
                options[mangos.OptionSubscribe] = []byte(t)
            }
        } else {
            options[mangos.OptionSubscribe] = []byte("")
        }
    }
    // len 1 or 0, surveyor time
    if p == SURVEYOR {
        if len(o) == 1 {
            v := o[0]
            if t, ok := v.(int); ok {
                surveyorTime = t / 2
                if surveyorTime == 0 {
                    surveyorTime = 1
                }
            }
        }
        options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second
    }
    return options
}
func setSocketOptions(sock mangos.Socket, options map[string]interface{}) error {
    for k, v := range options {
        if err := sock.SetOption(k, v); err != nil {
            return err
        }
    }
    return nil
}