From 9d4b12ffee1c25de247568c3f9a51be4996da09b Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期三, 15 五月 2019 09:55:12 +0800
Subject: [PATCH] wrap mangos

---
 mode.go    |   18 +++
 deliver.go |   30 +++++
 nng.go     |  254 ++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 302 insertions(+), 0 deletions(-)

diff --git a/deliver.go b/deliver.go
new file mode 100644
index 0000000..4314c94
--- /dev/null
+++ b/deliver.go
@@ -0,0 +1,30 @@
+package deliver
+
+// Deliver define a interface how to use mangos
+type Deliver interface {
+
+	// Send send data
+	Send([]byte) error
+
+	// Recv recv data
+	Recv() ([]byte, error)
+}
+
+// NewProducer create producer args presentive for parameter with protocal, e.g. sub topic
+func NewProducer(m Mode, url string, args ...interface{}) Deliver {
+
+	if m > ModeStart && m < ModeNNG {
+		return NewNNGProducer(m, url, args)
+	}
+	return nil
+}
+
+// NewConsumer create consumer args presentive for parameter with protocal, e.g. sub topic
+func NewConsumer(m Mode, url string, args ...interface{}) Deliver {
+
+	if m > ModeStart && m < ModeNNG {
+		return NewNNGConsumer(m, url, args)
+	}
+
+	return nil
+}
diff --git a/mode.go b/mode.go
new file mode 100644
index 0000000..f64f576
--- /dev/null
+++ b/mode.go
@@ -0,0 +1,18 @@
+package deliver
+
+// Mode is the numeric abstraction to the various protocols or patterns
+// that Mangos supports.
+type Mode int
+
+// Constants for protocols.
+const (
+	ModeStart = iota
+	PushPull
+	PubSub
+	ReqRep
+	SurvResp
+	Bus
+	Pair
+	ModeNNG
+	ModeEnd
+)
diff --git a/nng.go b/nng.go
new file mode 100644
index 0000000..e9568cd
--- /dev/null
+++ b/nng.go
@@ -0,0 +1,254 @@
+package deliver
+
+import (
+	"errors"
+	"fmt"
+	"os"
+	"strings"
+
+	"nanomsg.org/go-mangos"
+	"nanomsg.org/go-mangos/protocol/bus"
+	"nanomsg.org/go-mangos/protocol/pair"
+	"nanomsg.org/go-mangos/protocol/pub"
+	"nanomsg.org/go-mangos/protocol/pull"
+	"nanomsg.org/go-mangos/protocol/push"
+	"nanomsg.org/go-mangos/protocol/rep"
+	"nanomsg.org/go-mangos/protocol/req"
+	"nanomsg.org/go-mangos/protocol/respondent"
+	"nanomsg.org/go-mangos/protocol/sub"
+	"nanomsg.org/go-mangos/protocol/surveyor"
+	"nanomsg.org/go-mangos/transport/all"
+)
+
+// NNG mangos wrap
+type NNG struct {
+	sock mangos.Socket
+}
+
+// Send impl interface Diliver
+func (n *NNG) Send(data []byte) error {
+	if n.sock == nil {
+		return errors.New("please init NNG first")
+	}
+
+	if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
+		msg := mangos.NewMessage(len(data))
+		msg.Body = data
+		return n.sock.SendMsg(msg)
+	}
+	return n.sock.Send(data)
+}
+
+// Recv impl interface Diliver
+func (n *NNG) Recv() ([]byte, error) {
+	if n.sock == nil {
+		return nil, errors.New("please init NNG first")
+	}
+	if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
+		msg, err := n.sock.RecvMsg()
+		return msg.Body, err
+	}
+	return n.sock.Recv()
+}
+
+// NewNNGProducer create from deliver Mode
+func NewNNGProducer(m Mode, url string, args ...interface{}) *NNG {
+
+	rmExistedIpcName(url)
+	if sock, err := newSocket(protoProducer(m)); err == nil {
+		if err = setSocketOptions(sock, args); err != nil {
+			return nil
+		}
+		if err = sock.Listen(url); err != nil {
+			sock.Close()
+			return nil
+		}
+		return &NNG{
+			sock,
+		}
+	}
+
+	return nil
+}
+
+// NewNNGConsumer create from deliver Mode
+func NewNNGConsumer(m Mode, url string, args ...interface{}) *NNG {
+
+	if sock, err := newSocket(protoConsumer(m)); err == nil {
+		if err = setSocketOptions(sock, args); err != nil {
+			return nil
+		}
+
+		if err = sock.Dial(url); err != nil {
+			sock.Close()
+			return nil
+		}
+
+		return &NNG{
+			sock,
+		}
+	}
+
+	return nil
+}
+
+// MaxRecvSize max recv size
+var MaxRecvSize = 33 * 1024 * 1024
+
+func defualtSocketOptions(sock mangos.Socket) error {
+	var err error
+	if err = sock.SetOption(mangos.OptionMaxRecvSize, MaxRecvSize); err != nil {
+		sock.Close()
+		return err
+	}
+	if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil {
+		sock.Close()
+		return err
+	}
+	if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil {
+		sock.Close()
+		return err
+	}
+	// if err = sock.SetOption(mangos.OptionNoDelay, true); err != nil {
+	// 	sock.Close()
+	// 	return err
+	// }
+	if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
+		sock.Close()
+		return err
+	}
+
+	return nil
+}
+
+func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
+
+	err := defualtSocketOptions(sock)
+	if err != nil {
+		return err
+	}
+	if sock.GetProtocol().Number() == mangos.ProtoSub {
+		for _, arg := range args {
+			switch arg.(type) {
+			case string:
+				err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string)))
+			default:
+				err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
+			}
+		}
+	}
+
+	return nil
+}
+
+func rmExistedIpcName(url string) {
+	s := strings.Split(url, "://")
+
+	if s[0] == "ipc" {
+		if _, err := os.Stat(s[1]); err == nil {
+			os.Remove(s[1])
+		}
+	}
+}
+
+// newSocket allocates a new Socket.  The Socket is the handle used to
+// access the underlying library.
+func newSocket(p protocol) (mangos.Socket, error) {
+
+	var s mangos.Socket
+	var err error
+
+	switch p {
+	case PUB:
+		s, err = pub.NewSocket()
+	case SUB:
+		s, err = sub.NewSocket()
+	case PUSH:
+		s, err = push.NewSocket()
+	case PULL:
+		s, err = pull.NewSocket()
+	case REQ:
+		s, err = req.NewSocket()
+	case REP:
+		s, err = rep.NewSocket()
+	case SURVEYOR:
+		s, err = surveyor.NewSocket()
+	case RESPONDENT:
+		s, err = respondent.NewSocket()
+	case PAIR:
+		s, err = pair.NewSocket()
+	case BUS:
+		s, err = bus.NewSocket()
+	default:
+		err = mangos.ErrBadProto
+	}
+
+	if err != nil {
+		return nil, err
+	}
+
+	all.AddTransports(s)
+	// s.AddTransport(ipc.NewTransport())
+	// s.AddTransport(tcp.NewTransport())
+
+	return s, nil
+}
+
+func die(format string, v ...interface{}) {
+	fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
+	os.Exit(1)
+}
+
+// Protocol is the numeric abstraction to the various protocols or patterns
+// that Mangos supports.
+type protocol int
+
+// Constants for protocols.
+const (
+	PUSH       = protocol(mangos.ProtoPush)
+	PULL       = protocol(mangos.ProtoPull)
+	PUB        = protocol(mangos.ProtoPub)
+	SUB        = protocol(mangos.ProtoSub)
+	REQ        = protocol(mangos.ProtoReq)
+	REP        = protocol(mangos.ProtoRep)
+	SURVEYOR   = protocol(mangos.ProtoSurveyor)
+	RESPONDENT = protocol(mangos.ProtoRespondent)
+	BUS        = protocol(mangos.ProtoBus)
+	PAIR       = protocol(mangos.ProtoPair)
+)
+
+func protoProducer(m Mode) protocol {
+	switch m {
+	case PushPull:
+		return PUSH
+	case PubSub:
+		return PUB
+	case ReqRep:
+		return REP
+	case SurvResp:
+		return SURVEYOR
+	case Bus:
+		return BUS
+	case Pair:
+		return PAIR
+	}
+	return PUSH
+}
+
+func protoConsumer(m Mode) protocol {
+	switch m {
+	case PushPull:
+		return PULL
+	case PubSub:
+		return SUB
+	case ReqRep:
+		return REQ
+	case SurvResp:
+		return RESPONDENT
+	case Bus:
+		return BUS
+	case Pair:
+		return PAIR
+	}
+	return PULL
+}

--
Gitblit v1.8.0