package deliver
|
|
import (
|
"errors"
|
"fmt"
|
"os"
|
"strings"
|
"time"
|
|
"nanomsg.org/go-mangos"
|
"nanomsg.org/go-mangos/protocol/bus"
|
"nanomsg.org/go-mangos/protocol/pair"
|
"nanomsg.org/go-mangos/protocol/pub"
|
"nanomsg.org/go-mangos/protocol/pull"
|
"nanomsg.org/go-mangos/protocol/push"
|
"nanomsg.org/go-mangos/protocol/rep"
|
"nanomsg.org/go-mangos/protocol/req"
|
"nanomsg.org/go-mangos/protocol/respondent"
|
"nanomsg.org/go-mangos/protocol/sub"
|
"nanomsg.org/go-mangos/protocol/surveyor"
|
"nanomsg.org/go-mangos/transport/all"
|
)
|
|
// NNG mangos wrap
|
type NNG struct {
|
sock mangos.Socket
|
}
|
|
// Send impl interface Diliver
|
func (n *NNG) Send(data []byte) error {
|
if n.sock == nil {
|
return errors.New("please init NNG first")
|
}
|
|
switch n.sock.GetProtocol().Number() {
|
case mangos.ProtoSurveyor:
|
time.Sleep(surveyorTime * 2)
|
default:
|
}
|
if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
|
msg := mangos.NewMessage(len(data))
|
msg.Body = data
|
return n.sock.SendMsg(msg)
|
}
|
|
return n.sock.Send(data)
|
}
|
|
// Recv impl interface Diliver
|
func (n *NNG) Recv() ([]byte, error) {
|
if n.sock == nil {
|
return nil, errors.New("please init NNG first")
|
}
|
if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
|
msg, err := n.sock.RecvMsg()
|
return msg.Body, err
|
}
|
return n.sock.Recv()
|
}
|
|
// Close impl interface Deliver
|
func (n *NNG) Close() {
|
if n.sock != nil {
|
n.sock.Close()
|
}
|
}
|
|
// nngProducer create from deliver Mode
|
func nngProducer(m Mode, url string, args ...interface{}) *NNG {
|
|
rmExistedIpcName(url)
|
if sock, err := newSocket(protoProducer(m)); err == nil {
|
if err = setSocketOptions(sock, args); err != nil {
|
return nil
|
}
|
if err = sock.Listen(url); err != nil {
|
sock.Close()
|
return nil
|
}
|
return &NNG{
|
sock,
|
}
|
}
|
|
return nil
|
}
|
|
// nngConsumer create from deliver Mode
|
func nngConsumer(m Mode, url string, args ...interface{}) *NNG {
|
|
if sock, err := newSocket(protoConsumer(m)); err == nil {
|
if err = setSocketOptions(sock, args); err != nil {
|
return nil
|
}
|
|
if err = sock.Dial(url); err != nil {
|
sock.Close()
|
return nil
|
}
|
|
return &NNG{
|
sock,
|
}
|
}
|
|
return nil
|
}
|
|
// maxRecvSize max recv size
|
var (
|
maxRecvSize = 33 * 1024 * 1024
|
surveyorTime = time.Second / 2
|
)
|
|
func defualtSocketOptions(sock mangos.Socket) error {
|
var err error
|
if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil {
|
sock.Close()
|
return err
|
}
|
if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil {
|
sock.Close()
|
return err
|
}
|
if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil {
|
sock.Close()
|
return err
|
}
|
// if err = sock.SetOption(mangos.OptionRecvDeadline, time.Second); err != nil {
|
// sock.Close()
|
// return err
|
// }
|
// if err = sock.SetOption(mangos.OptionSendDeadline, time.Second); err != nil {
|
// sock.Close()
|
// return err
|
// }
|
if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
|
sock.Close()
|
return err
|
}
|
|
return nil
|
}
|
|
func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
|
|
err := defualtSocketOptions(sock)
|
if err != nil {
|
return err
|
}
|
switch sock.GetProtocol().Number() {
|
case mangos.ProtoSub:
|
for _, arg := range args {
|
switch arg.(type) {
|
case string:
|
err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string)))
|
default:
|
err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
|
}
|
}
|
case mangos.ProtoSurveyor:
|
for _, arg := range args {
|
switch arg.(type) {
|
case int:
|
surveyorTime = time.Duration(arg.(int)/2) * time.Second
|
default:
|
}
|
err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime)
|
}
|
default:
|
fmt.Println("no additional args")
|
}
|
|
return nil
|
}
|
|
func rmExistedIpcName(url string) {
|
s := strings.Split(url, "://")
|
|
if s[0] == "ipc" {
|
if _, err := os.Stat(s[1]); err == nil {
|
os.Remove(s[1])
|
}
|
}
|
}
|
|
// newSocket allocates a new Socket. The Socket is the handle used to
|
// access the underlying library.
|
func newSocket(p protocol) (mangos.Socket, error) {
|
|
var s mangos.Socket
|
var err error
|
|
switch p {
|
case PUB:
|
s, err = pub.NewSocket()
|
case SUB:
|
s, err = sub.NewSocket()
|
case PUSH:
|
s, err = push.NewSocket()
|
case PULL:
|
s, err = pull.NewSocket()
|
case REQ:
|
s, err = req.NewSocket()
|
case REP:
|
s, err = rep.NewSocket()
|
case SURVEYOR:
|
s, err = surveyor.NewSocket()
|
case RESPONDENT:
|
s, err = respondent.NewSocket()
|
case PAIR:
|
s, err = pair.NewSocket()
|
case BUS:
|
s, err = bus.NewSocket()
|
default:
|
err = mangos.ErrBadProto
|
}
|
|
if err != nil {
|
return nil, err
|
}
|
|
all.AddTransports(s)
|
// s.AddTransport(ipc.NewTransport())
|
// s.AddTransport(tcp.NewTransport())
|
|
return s, nil
|
}
|
|
func die(format string, v ...interface{}) {
|
fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
|
os.Exit(1)
|
}
|
|
// Protocol is the numeric abstraction to the various protocols or patterns
|
// that Mangos supports.
|
type protocol int
|
|
// Constants for protocols.
|
const (
|
PUSH = protocol(mangos.ProtoPush)
|
PULL = protocol(mangos.ProtoPull)
|
PUB = protocol(mangos.ProtoPub)
|
SUB = protocol(mangos.ProtoSub)
|
REQ = protocol(mangos.ProtoReq)
|
REP = protocol(mangos.ProtoRep)
|
SURVEYOR = protocol(mangos.ProtoSurveyor)
|
RESPONDENT = protocol(mangos.ProtoRespondent)
|
BUS = protocol(mangos.ProtoBus)
|
PAIR = protocol(mangos.ProtoPair)
|
)
|
|
func protoProducer(m Mode) protocol {
|
switch m {
|
case PushPull:
|
return PUSH
|
case PubSub:
|
return PUB
|
case ReqRep:
|
return REP
|
case SurvResp:
|
return SURVEYOR
|
case Bus:
|
return BUS
|
case Pair:
|
return PAIR
|
}
|
return PUSH
|
}
|
|
func protoConsumer(m Mode) protocol {
|
switch m {
|
case PushPull:
|
return PULL
|
case PubSub:
|
return SUB
|
case ReqRep:
|
return REQ
|
case SurvResp:
|
return RESPONDENT
|
case Bus:
|
return BUS
|
case Pair:
|
return PAIR
|
}
|
return PULL
|
}
|