package deliver
|
|
import (
|
"errors"
|
"fmt"
|
"os"
|
"strings"
|
"time"
|
|
"nanomsg.org/go-mangos"
|
"nanomsg.org/go-mangos/protocol/bus"
|
"nanomsg.org/go-mangos/protocol/pair"
|
"nanomsg.org/go-mangos/protocol/pub"
|
"nanomsg.org/go-mangos/protocol/pull"
|
"nanomsg.org/go-mangos/protocol/push"
|
"nanomsg.org/go-mangos/protocol/rep"
|
"nanomsg.org/go-mangos/protocol/req"
|
"nanomsg.org/go-mangos/protocol/respondent"
|
"nanomsg.org/go-mangos/protocol/sub"
|
"nanomsg.org/go-mangos/protocol/surveyor"
|
"nanomsg.org/go-mangos/transport/all"
|
)
|
|
// NNG mangos wrap
|
type NNG struct {
|
sock mangos.Socket
|
typ td
|
mode Mode
|
url string
|
|
sendMsg *mangos.Message
|
|
arguments []interface{}
|
}
|
|
// Send impl interface Diliver
|
func (n *NNG) Send(data []byte) error {
|
if n == nil {
|
return errors.New("please init NNG first")
|
}
|
var err error
|
if n.sock == nil {
|
n.sock, err = n.makeNNG(agent)
|
if err != nil {
|
fmt.Println("create nng sender error")
|
return err
|
}
|
}
|
|
if surveyorTime > 0 {
|
time.Sleep(time.Duration(surveyorTime*2) * time.Second)
|
}
|
|
// msg := mangos.NewMessage(len(data))
|
// msg.Body = data
|
// return n.sock.SendMsg(msg)
|
|
if n.sendMsg == nil {
|
n.sendMsg = mangos.NewMessage(1)
|
}
|
n.sendMsg.Body = data
|
return n.sock.SendMsg(n.sendMsg)
|
|
}
|
|
// Recv impl interface Diliver
|
func (n *NNG) Recv() ([]byte, error) {
|
if n == nil {
|
return nil, errors.New("please init NNG first")
|
}
|
|
var err error
|
|
if n.sock == nil {
|
n.sock, err = n.makeNNG(coactee)
|
if err != nil {
|
fmt.Println("create nng reciever error")
|
return nil, err
|
}
|
}
|
|
var msg *mangos.Message
|
if msg, err = n.sock.RecvMsg(); err != nil {
|
return nil, err
|
}
|
return msg.Body, nil
|
|
}
|
|
// Recv2 impl interface
|
func (n *NNG) Recv2(data []byte) (l int, err error) {
|
data, err = n.Recv()
|
l = len(data)
|
return l, err
|
}
|
|
// Close impl interface Deliver
|
func (n *NNG) Close() {
|
if n != nil && n.sock != nil {
|
n.sock.Close()
|
n.sock = nil
|
}
|
}
|
|
func nngServer(m Mode, url string, args ...interface{}) *NNG {
|
|
rmExistedIpcName(url)
|
|
return &NNG{
|
typ: agent,
|
mode: m,
|
url: url,
|
arguments: args,
|
}
|
}
|
|
func nngClient(m Mode, url string, args ...interface{}) *NNG {
|
|
return &NNG{
|
typ: coactee,
|
mode: m,
|
url: url,
|
arguments: args,
|
}
|
|
}
|
|
func proto(typ td, m Mode) protocol {
|
if typ == agent {
|
return protoAgent(m)
|
} else if typ == coactee {
|
return protoCoactee(m)
|
}
|
return NONE
|
}
|
|
func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
|
|
var sock mangos.Socket
|
var err error
|
|
switch n.mode {
|
case Bus:
|
sock, err = n.busMakeNNG(typ)
|
case ReqRep, SurvResp:
|
sock, err = n.rrMakeNNG(typ)
|
default:
|
sock, err = n.ppMakeNNG(typ)
|
}
|
|
return sock, err
|
}
|
|
func rmExistedIpcName(url string) {
|
s := strings.Split(url, "://")
|
|
if s[0] == "ipc" {
|
if _, err := os.Stat(s[1]); err == nil {
|
os.Remove(s[1])
|
}
|
}
|
}
|
|
// newSocket allocates a new Socket. The Socket is the handle used to
|
// access the underlying library.
|
func newSocket(p protocol) (mangos.Socket, error) {
|
|
if p == NONE {
|
return nil, errors.New("new socket protocol none")
|
}
|
var s mangos.Socket
|
var err error
|
|
switch p {
|
case PUB:
|
s, err = pub.NewSocket()
|
case SUB:
|
s, err = sub.NewSocket()
|
case PUSH:
|
s, err = push.NewSocket()
|
case PULL:
|
s, err = pull.NewSocket()
|
case REQ:
|
s, err = req.NewSocket()
|
case REP:
|
s, err = rep.NewSocket()
|
case SURVEYOR:
|
s, err = surveyor.NewSocket()
|
case RESPONDENT:
|
s, err = respondent.NewSocket()
|
case PAIR:
|
s, err = pair.NewSocket()
|
case BUS:
|
s, err = bus.NewSocket()
|
default:
|
err = mangos.ErrBadProto
|
}
|
|
if err != nil {
|
return nil, err
|
}
|
|
all.AddTransports(s)
|
|
return s, nil
|
}
|
|
func die(format string, v ...interface{}) {
|
fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
|
os.Exit(1)
|
}
|
|
// Protocol is the numeric abstraction to the various protocols or patterns
|
// that Mangos supports.
|
type protocol int
|
|
// Constants for protocols.
|
const (
|
NONE = -1
|
PUSH = protocol(mangos.ProtoPush)
|
PULL = protocol(mangos.ProtoPull)
|
PUB = protocol(mangos.ProtoPub)
|
SUB = protocol(mangos.ProtoSub)
|
REQ = protocol(mangos.ProtoReq)
|
REP = protocol(mangos.ProtoRep)
|
SURVEYOR = protocol(mangos.ProtoSurveyor)
|
RESPONDENT = protocol(mangos.ProtoRespondent)
|
BUS = protocol(mangos.ProtoBus)
|
PAIR = protocol(mangos.ProtoPair)
|
)
|
|
func protoAgent(m Mode) protocol {
|
switch m {
|
case PushPull:
|
return PUSH
|
case PubSub:
|
return PUB
|
case Pair:
|
return PAIR
|
case SurvResp:
|
return SURVEYOR
|
case ReqRep:
|
return REQ
|
case Bus:
|
return BUS
|
}
|
return NONE
|
}
|
|
func protoCoactee(m Mode) protocol {
|
switch m {
|
case PushPull:
|
return PULL
|
case PubSub:
|
return SUB
|
case Pair:
|
return PAIR
|
case SurvResp:
|
return RESPONDENT
|
case ReqRep:
|
return REP
|
case Bus:
|
return BUS
|
}
|
return NONE
|
}
|