package deliver
|
|
import (
|
"fmt"
|
"os"
|
"strings"
|
"time"
|
|
"nanomsg.org/go-mangos"
|
"nanomsg.org/go-mangos/protocol/bus"
|
"nanomsg.org/go-mangos/protocol/pair"
|
"nanomsg.org/go-mangos/protocol/pub"
|
"nanomsg.org/go-mangos/protocol/pull"
|
"nanomsg.org/go-mangos/protocol/push"
|
"nanomsg.org/go-mangos/protocol/rep"
|
"nanomsg.org/go-mangos/protocol/req"
|
"nanomsg.org/go-mangos/protocol/respondent"
|
"nanomsg.org/go-mangos/protocol/sub"
|
"nanomsg.org/go-mangos/protocol/surveyor"
|
"nanomsg.org/go-mangos/transport/all"
|
)
|
|
// type deliver
|
type td int
|
|
const (
|
producer = td(iota)
|
consumer
|
star //mangos bus protocol
|
)
|
|
// NNG mangos wrap
|
type NNG struct {
|
sock mangos.Socket
|
server bool
|
mode Mode
|
url string
|
|
arguments []interface{}
|
}
|
|
// Send impl interface Diliver
|
func (n *NNG) Send(data []byte) error {
|
var err error
|
if n.sock == nil {
|
n.sock, err = n.makeNNG(producer)
|
if err != nil {
|
fmt.Println("create nng producer error")
|
return err
|
}
|
}
|
|
if surveyorTime > 0 {
|
time.Sleep(time.Duration(surveyorTime*2) * time.Second)
|
}
|
|
msg := mangos.NewMessage(len(data))
|
msg.Body = data
|
return n.sock.SendMsg(msg)
|
|
}
|
|
// Recv impl interface Diliver
|
func (n *NNG) Recv() ([]byte, error) {
|
var err error
|
|
if n.sock == nil {
|
n.sock, err = n.makeNNG(consumer)
|
if err != nil {
|
fmt.Println("create nng consumer error")
|
return nil, err
|
}
|
}
|
|
var msg *mangos.Message
|
if msg, err = n.sock.RecvMsg(); err != nil {
|
return nil, err
|
}
|
return msg.Body, nil
|
|
}
|
|
// Close impl interface Deliver
|
func (n *NNG) Close() {
|
if n.sock != nil {
|
n.sock.Close()
|
n.sock = nil
|
}
|
}
|
|
func nngServer(m Mode, url string, args ...interface{}) *NNG {
|
|
rmExistedIpcName(url)
|
|
return &NNG{
|
server: true,
|
mode: m,
|
url: url,
|
arguments: args,
|
}
|
}
|
|
func nngClient(m Mode, url string, args ...interface{}) *NNG {
|
|
return &NNG{
|
server: false,
|
mode: m,
|
url: url,
|
arguments: args,
|
}
|
|
}
|
|
func proto(typ td, m Mode) protocol {
|
if typ == producer {
|
return protoProducer(m)
|
} else if typ == consumer {
|
return protoConsumer(m)
|
}
|
return protoConsumer(m)
|
}
|
|
func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
|
|
var sock mangos.Socket
|
var err error
|
if sock, err = newSocket(proto(typ, n.mode)); err != nil {
|
return nil, err
|
}
|
|
if err = setSocketOptions(sock, n.arguments...); err != nil {
|
sock.Close()
|
sock = nil
|
}
|
if n.server {
|
if err = sock.Listen(n.url); err != nil {
|
sock.Close()
|
sock = nil
|
}
|
} else {
|
if err = sock.Dial(n.url); err != nil {
|
sock.Close()
|
sock = nil
|
}
|
}
|
return sock, err
|
}
|
|
// maxRecvSize max recv size
|
var (
|
maxRecvSize = 33 * 1024 * 1024
|
surveyorTime = -1
|
)
|
|
func defualtSocketOptions() map[string]interface{} {
|
|
options := make(map[string]interface{})
|
|
options[mangos.OptionMaxRecvSize] = maxRecvSize
|
options[mangos.OptionWriteQLen] = 0
|
options[mangos.OptionReadQLen] = 0
|
// options[mangos.OptionRecvDeadline] = time.Second
|
// options[mangos.OptionSendDeadline] = time.Second
|
options[mangos.OptionRaw] = true
|
|
return options
|
}
|
|
func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
|
|
options := defualtSocketOptions()
|
|
switch sock.GetProtocol().Number() {
|
case mangos.ProtoSub:
|
topic := ""
|
for _, arg := range args {
|
switch arg.(type) {
|
case string:
|
topic = arg.(string)
|
default:
|
}
|
}
|
options[mangos.OptionSubscribe] = []byte(topic)
|
case mangos.ProtoSurveyor:
|
for _, arg := range args {
|
switch arg.(type) {
|
case int:
|
if arg.(int) < 2 {
|
surveyorTime = 1
|
} else {
|
surveyorTime = arg.(int) / 2
|
}
|
default:
|
}
|
}
|
options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second
|
|
default:
|
}
|
|
for k, v := range options {
|
if err := sock.SetOption(k, v); err != nil {
|
return err
|
}
|
}
|
return nil
|
}
|
|
func rmExistedIpcName(url string) {
|
s := strings.Split(url, "://")
|
|
if s[0] == "ipc" {
|
if _, err := os.Stat(s[1]); err == nil {
|
os.Remove(s[1])
|
}
|
}
|
}
|
|
// newSocket allocates a new Socket. The Socket is the handle used to
|
// access the underlying library.
|
func newSocket(p protocol) (mangos.Socket, error) {
|
|
var s mangos.Socket
|
var err error
|
|
switch p {
|
case PUB:
|
s, err = pub.NewSocket()
|
case SUB:
|
s, err = sub.NewSocket()
|
case PUSH:
|
s, err = push.NewSocket()
|
case PULL:
|
s, err = pull.NewSocket()
|
case REQ:
|
s, err = req.NewSocket()
|
case REP:
|
s, err = rep.NewSocket()
|
case SURVEYOR:
|
s, err = surveyor.NewSocket()
|
case RESPONDENT:
|
s, err = respondent.NewSocket()
|
case PAIR:
|
s, err = pair.NewSocket()
|
case BUS:
|
s, err = bus.NewSocket()
|
default:
|
err = mangos.ErrBadProto
|
}
|
|
if err != nil {
|
return nil, err
|
}
|
|
all.AddTransports(s)
|
|
return s, nil
|
}
|
|
func die(format string, v ...interface{}) {
|
fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
|
os.Exit(1)
|
}
|
|
// Protocol is the numeric abstraction to the various protocols or patterns
|
// that Mangos supports.
|
type protocol int
|
|
// Constants for protocols.
|
const (
|
PUSH = protocol(mangos.ProtoPush)
|
PULL = protocol(mangos.ProtoPull)
|
PUB = protocol(mangos.ProtoPub)
|
SUB = protocol(mangos.ProtoSub)
|
REQ = protocol(mangos.ProtoReq)
|
REP = protocol(mangos.ProtoRep)
|
SURVEYOR = protocol(mangos.ProtoSurveyor)
|
RESPONDENT = protocol(mangos.ProtoRespondent)
|
BUS = protocol(mangos.ProtoBus)
|
PAIR = protocol(mangos.ProtoPair)
|
)
|
|
func protoProducer(m Mode) protocol {
|
switch m {
|
case PushPull:
|
return PUSH
|
case PubSub:
|
return PUB
|
case ReqRep:
|
return REP
|
case SurvResp:
|
return SURVEYOR
|
case Bus:
|
return BUS
|
case Pair:
|
return PAIR
|
}
|
return PUSH
|
}
|
|
func protoConsumer(m Mode) protocol {
|
switch m {
|
case PushPull:
|
return PULL
|
case PubSub:
|
return SUB
|
case ReqRep:
|
return REQ
|
case SurvResp:
|
return RESPONDENT
|
case Bus:
|
return BUS
|
case Pair:
|
return PAIR
|
}
|
return PULL
|
}
|