zhangmeng
2019-05-17 36766ab5b68ce7dfb39dff5d6d283ce5c7f4b346
fix crash
1个文件已修改
137 ■■■■■ 已修改文件
nng.go 137 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nng.go
@@ -1,7 +1,6 @@
package deliver
import (
    "errors"
    "fmt"
    "os"
    "strings"
@@ -23,43 +22,53 @@
// NNG mangos wrap
type NNG struct {
    sock mangos.Socket
    raw  bool
    sock   mangos.Socket
    server bool
    mode   Mode
    url    string
    arguments []interface{}
}
// Send impl interface Diliver
func (n *NNG) Send(data []byte) error {
    var err error
    if n.sock == nil {
        return errors.New("please init NNG first")
        n.sock, err = n.makeNNG(true)
        if err != nil {
            fmt.Println("create nng producer error")
            return err
        }
    }
    if surveyorTime > 0 {
        time.Sleep(time.Duration(surveyorTime*2) * time.Second)
    }
    if n.raw {
        msg := mangos.NewMessage(len(data))
        msg.Body = data
        return n.sock.SendMsg(msg)
    }
    msg := mangos.NewMessage(len(data))
    msg.Body = data
    return n.sock.SendMsg(msg)
    return n.sock.Send(data)
}
// Recv impl interface Diliver
func (n *NNG) Recv() ([]byte, error) {
    var err error
    if n.sock == nil {
        return nil, errors.New("please init NNG first")
    }
    if n.raw {
        var msg *mangos.Message
        var err error
        if msg, err = n.sock.RecvMsg(); err != nil {
        n.sock, err = n.makeNNG(false)
        if err != nil {
            fmt.Println("create nng consumer error")
            return nil, err
        }
        return msg.Body, nil
    }
    return n.sock.Recv()
    var msg *mangos.Message
    if msg, err = n.sock.RecvMsg(); err != nil {
        return nil, err
    }
    return msg.Body, nil
}
// Close impl interface Deliver
@@ -73,48 +82,57 @@
func nngServer(m Mode, url string, args ...interface{}) *NNG {
    rmExistedIpcName(url)
    if sock, err := newSocket(protoProducer(m)); err == nil {
        if err = setSocketOptions(sock, args); err != nil {
            sock.Close()
            sock = nil
            return nil
        }
        if err = sock.Listen(url); err != nil {
            sock.Close()
            sock = nil
            return nil
        }
        return &NNG{
            sock,
            true,
        }
    }
    return nil
    return &NNG{
        server:    true,
        mode:      m,
        url:       url,
        arguments: args,
    }
}
func nngClient(m Mode, url string, args ...interface{}) *NNG {
    if sock, err := newSocket(protoConsumer(m)); err == nil {
        if err = setSocketOptions(sock, args); err != nil {
            sock.Close()
            sock = nil
            return nil
        }
        if err = sock.Dial(url); err != nil {
            sock.Close()
            sock = nil
            return nil
        }
        return &NNG{
            sock,
            true,
        }
    return &NNG{
        server:    false,
        mode:      m,
        url:       url,
        arguments: args,
    }
    return nil
}
func proto(producer bool, m Mode) protocol {
    if producer {
        return protoProducer(m)
    }
    return protoConsumer(m)
}
func (n *NNG) makeNNG(producer bool) (mangos.Socket, error) {
    var sock mangos.Socket
    var err error
    if sock, err = newSocket(proto(producer, n.mode)); err != nil {
        return nil, err
    }
    if err = setSocketOptions(sock, n.arguments...); err != nil {
        sock.Close()
        sock = nil
    }
    if n.server {
        if err = sock.Listen(n.url); err != nil {
            sock.Close()
            sock = nil
        }
    } else {
        if err = sock.Dial(n.url); err != nil {
            sock.Close()
            sock = nil
        }
    }
    return sock, err
}
// maxRecvSize max recv size
@@ -130,8 +148,8 @@
    options[mangos.OptionMaxRecvSize] = maxRecvSize
    options[mangos.OptionWriteQLen] = 0
    options[mangos.OptionReadQLen] = 0
    options[mangos.OptionRecvDeadline] = time.Second
    options[mangos.OptionSendDeadline] = time.Second
    // options[mangos.OptionRecvDeadline] = time.Second
    // options[mangos.OptionSendDeadline] = time.Second
    options[mangos.OptionRaw] = true
    return options
@@ -143,14 +161,15 @@
    switch sock.GetProtocol().Number() {
    case mangos.ProtoSub:
        topic := ""
        for _, arg := range args {
            switch arg.(type) {
            case string:
                options[mangos.OptionSubscribe] = []byte(arg.(string))
                topic = arg.(string)
            default:
                options[mangos.OptionSubscribe] = []byte("")
            }
        }
        options[mangos.OptionSubscribe] = []byte(topic)
    case mangos.ProtoSurveyor:
        for _, arg := range args {
            switch arg.(type) {
@@ -162,10 +181,10 @@
                }
            default:
            }
            options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second
        }
        options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second
    default:
        fmt.Println("no additional args")
    }
    for k, v := range options {