zhangmeng
2019-05-16 8e158e611ca6a3663e383d5a9b14d14eaf897736
change deliver interface name
2个文件已修改
78 ■■■■■ 已修改文件
deliver.go 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nng.go 66 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
deliver.go
@@ -13,20 +13,20 @@
    Close()
}
// NewProducer create producer args presentive for parameter with protocal, e.g. sub topic
func NewProducer(m Mode, url string, args ...interface{}) Deliver {
// NewListener create listener args presentive for parameter with protocal, e.g. sub topic
func NewListener(m Mode, url string, args ...interface{}) Deliver {
    if m > ModeStart && m < ModeNNG {
        return nngProducer(m, url, args...)
        return nngListener(m, url, args...)
    }
    return nil
}
// NewConsumer create consumer args presentive for parameter with protocal, e.g. sub topic
func NewConsumer(m Mode, url string, args ...interface{}) Deliver {
// NewDialer create dialer args presentive for parameter with protocal, e.g. sub topic
func NewDialer(m Mode, url string, args ...interface{}) Deliver {
    if m > ModeStart && m < ModeNNG {
        return nngConsumer(m, url, args...)
        return nngDialer(m, url, args...)
    }
    return nil
nng.go
@@ -67,11 +67,11 @@
func (n *NNG) Close() {
    if n.sock != nil {
        n.sock.Close()
        n.sock = nil
    }
}
// nngProducer create from deliver Mode
func nngProducer(m Mode, url string, args ...interface{}) *NNG {
func nngListener(m Mode, url string, args ...interface{}) *NNG {
    rmExistedIpcName(url)
    if sock, err := newSocket(protoProducer(m)); err == nil {
@@ -80,6 +80,7 @@
        }
        if err = sock.Listen(url); err != nil {
            sock.Close()
            sock = nil
            return nil
        }
        return &NNG{
@@ -91,8 +92,7 @@
    return nil
}
// nngConsumer create from deliver Mode
func nngConsumer(m Mode, url string, args ...interface{}) *NNG {
func nngDialer(m Mode, url string, args ...interface{}) *NNG {
    if sock, err := newSocket(protoConsumer(m)); err == nil {
        if err = setSocketOptions(sock, args); err != nil {
@@ -101,6 +101,7 @@
        if err = sock.Dial(url); err != nil {
            sock.Close()
            sock = nil
            return nil
        }
@@ -119,50 +120,32 @@
    surveyorTime = time.Second / 2
)
func defualtSocketOptions(sock mangos.Socket) error {
    var err error
    if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil {
        sock.Close()
        return err
    }
    if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil {
        sock.Close()
        return err
    }
    if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil {
        sock.Close()
        return err
    }
    if err = sock.SetOption(mangos.OptionRecvDeadline, time.Second); err != nil {
        sock.Close()
        return err
    }
    if err = sock.SetOption(mangos.OptionSendDeadline, time.Second); err != nil {
        sock.Close()
        return err
    }
    if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
        sock.Close()
        return err
    }
func defualtSocketOptions() map[string]interface{} {
    return nil
    options := make(map[string]interface{})
    options[mangos.OptionMaxRecvSize] = maxRecvSize
    options[mangos.OptionWriteQLen] = 0
    options[mangos.OptionReadQLen] = 0
    options[mangos.OptionRecvDeadline] = time.Second
    options[mangos.OptionSendDeadline] = time.Second
    options[mangos.OptionRaw] = true
    return options
}
func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
    err := defualtSocketOptions(sock)
    if err != nil {
        return err
    }
    options := defualtSocketOptions()
    switch sock.GetProtocol().Number() {
    case mangos.ProtoSub:
        for _, arg := range args {
            switch arg.(type) {
            case string:
                err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string)))
                options[mangos.OptionSubscribe] = []byte(arg.(string))
            default:
                err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
                options[mangos.OptionSubscribe] = []byte("")
            }
        }
    case mangos.ProtoSurveyor:
@@ -172,12 +155,17 @@
                surveyorTime = time.Duration(arg.(int)/2) * time.Second
            default:
            }
            err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime)
            options[mangos.OptionSurveyTime] = surveyorTime
        }
    default:
        fmt.Println("no additional args")
    }
    for k, v := range options {
        if err := sock.SetOption(k, v); err != nil {
            return err
        }
    }
    return nil
}
@@ -228,8 +216,6 @@
    }
    all.AddTransports(s)
    // s.AddTransport(ipc.NewTransport())
    // s.AddTransport(tcp.NewTransport())
    return s, nil
}