zhangmeng
2019-05-15 9d4b12ffee1c25de247568c3f9a51be4996da09b
wrap mangos
3个文件已添加
302 ■■■■■ 已修改文件
deliver.go 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
mode.go 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nng.go 254 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
deliver.go
New file
@@ -0,0 +1,30 @@
package deliver
// Deliver define a interface how to use mangos
type Deliver interface {
    // Send send data
    Send([]byte) error
    // Recv recv data
    Recv() ([]byte, error)
}
// NewProducer create producer args presentive for parameter with protocal, e.g. sub topic
func NewProducer(m Mode, url string, args ...interface{}) Deliver {
    if m > ModeStart && m < ModeNNG {
        return NewNNGProducer(m, url, args)
    }
    return nil
}
// NewConsumer create consumer args presentive for parameter with protocal, e.g. sub topic
func NewConsumer(m Mode, url string, args ...interface{}) Deliver {
    if m > ModeStart && m < ModeNNG {
        return NewNNGConsumer(m, url, args)
    }
    return nil
}
mode.go
New file
@@ -0,0 +1,18 @@
package deliver
// Mode is the numeric abstraction to the various protocols or patterns
// that Mangos supports.
type Mode int
// Constants for protocols.
const (
    ModeStart = iota
    PushPull
    PubSub
    ReqRep
    SurvResp
    Bus
    Pair
    ModeNNG
    ModeEnd
)
nng.go
New file
@@ -0,0 +1,254 @@
package deliver
import (
    "errors"
    "fmt"
    "os"
    "strings"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/bus"
    "nanomsg.org/go-mangos/protocol/pair"
    "nanomsg.org/go-mangos/protocol/pub"
    "nanomsg.org/go-mangos/protocol/pull"
    "nanomsg.org/go-mangos/protocol/push"
    "nanomsg.org/go-mangos/protocol/rep"
    "nanomsg.org/go-mangos/protocol/req"
    "nanomsg.org/go-mangos/protocol/respondent"
    "nanomsg.org/go-mangos/protocol/sub"
    "nanomsg.org/go-mangos/protocol/surveyor"
    "nanomsg.org/go-mangos/transport/all"
)
// NNG mangos wrap
type NNG struct {
    sock mangos.Socket
}
// Send impl interface Diliver
func (n *NNG) Send(data []byte) error {
    if n.sock == nil {
        return errors.New("please init NNG first")
    }
    if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
        msg := mangos.NewMessage(len(data))
        msg.Body = data
        return n.sock.SendMsg(msg)
    }
    return n.sock.Send(data)
}
// Recv impl interface Diliver
func (n *NNG) Recv() ([]byte, error) {
    if n.sock == nil {
        return nil, errors.New("please init NNG first")
    }
    if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
        msg, err := n.sock.RecvMsg()
        return msg.Body, err
    }
    return n.sock.Recv()
}
// NewNNGProducer create from deliver Mode
func NewNNGProducer(m Mode, url string, args ...interface{}) *NNG {
    rmExistedIpcName(url)
    if sock, err := newSocket(protoProducer(m)); err == nil {
        if err = setSocketOptions(sock, args); err != nil {
            return nil
        }
        if err = sock.Listen(url); err != nil {
            sock.Close()
            return nil
        }
        return &NNG{
            sock,
        }
    }
    return nil
}
// NewNNGConsumer create from deliver Mode
func NewNNGConsumer(m Mode, url string, args ...interface{}) *NNG {
    if sock, err := newSocket(protoConsumer(m)); err == nil {
        if err = setSocketOptions(sock, args); err != nil {
            return nil
        }
        if err = sock.Dial(url); err != nil {
            sock.Close()
            return nil
        }
        return &NNG{
            sock,
        }
    }
    return nil
}
// MaxRecvSize max recv size
var MaxRecvSize = 33 * 1024 * 1024
func defualtSocketOptions(sock mangos.Socket) error {
    var err error
    if err = sock.SetOption(mangos.OptionMaxRecvSize, MaxRecvSize); err != nil {
        sock.Close()
        return err
    }
    if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil {
        sock.Close()
        return err
    }
    if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil {
        sock.Close()
        return err
    }
    // if err = sock.SetOption(mangos.OptionNoDelay, true); err != nil {
    //     sock.Close()
    //     return err
    // }
    if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
        sock.Close()
        return err
    }
    return nil
}
func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
    err := defualtSocketOptions(sock)
    if err != nil {
        return err
    }
    if sock.GetProtocol().Number() == mangos.ProtoSub {
        for _, arg := range args {
            switch arg.(type) {
            case string:
                err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string)))
            default:
                err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
            }
        }
    }
    return nil
}
func rmExistedIpcName(url string) {
    s := strings.Split(url, "://")
    if s[0] == "ipc" {
        if _, err := os.Stat(s[1]); err == nil {
            os.Remove(s[1])
        }
    }
}
// newSocket allocates a new Socket.  The Socket is the handle used to
// access the underlying library.
func newSocket(p protocol) (mangos.Socket, error) {
    var s mangos.Socket
    var err error
    switch p {
    case PUB:
        s, err = pub.NewSocket()
    case SUB:
        s, err = sub.NewSocket()
    case PUSH:
        s, err = push.NewSocket()
    case PULL:
        s, err = pull.NewSocket()
    case REQ:
        s, err = req.NewSocket()
    case REP:
        s, err = rep.NewSocket()
    case SURVEYOR:
        s, err = surveyor.NewSocket()
    case RESPONDENT:
        s, err = respondent.NewSocket()
    case PAIR:
        s, err = pair.NewSocket()
    case BUS:
        s, err = bus.NewSocket()
    default:
        err = mangos.ErrBadProto
    }
    if err != nil {
        return nil, err
    }
    all.AddTransports(s)
    // s.AddTransport(ipc.NewTransport())
    // s.AddTransport(tcp.NewTransport())
    return s, nil
}
func die(format string, v ...interface{}) {
    fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
    os.Exit(1)
}
// Protocol is the numeric abstraction to the various protocols or patterns
// that Mangos supports.
type protocol int
// Constants for protocols.
const (
    PUSH       = protocol(mangos.ProtoPush)
    PULL       = protocol(mangos.ProtoPull)
    PUB        = protocol(mangos.ProtoPub)
    SUB        = protocol(mangos.ProtoSub)
    REQ        = protocol(mangos.ProtoReq)
    REP        = protocol(mangos.ProtoRep)
    SURVEYOR   = protocol(mangos.ProtoSurveyor)
    RESPONDENT = protocol(mangos.ProtoRespondent)
    BUS        = protocol(mangos.ProtoBus)
    PAIR       = protocol(mangos.ProtoPair)
)
func protoProducer(m Mode) protocol {
    switch m {
    case PushPull:
        return PUSH
    case PubSub:
        return PUB
    case ReqRep:
        return REP
    case SurvResp:
        return SURVEYOR
    case Bus:
        return BUS
    case Pair:
        return PAIR
    }
    return PUSH
}
func protoConsumer(m Mode) protocol {
    switch m {
    case PushPull:
        return PULL
    case PubSub:
        return SUB
    case ReqRep:
        return REQ
    case SurvResp:
        return RESPONDENT
    case Bus:
        return BUS
    case Pair:
        return PAIR
    }
    return PULL
}