From 9d4b12ffee1c25de247568c3f9a51be4996da09b Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期三, 15 五月 2019 09:55:12 +0800
Subject: [PATCH] wrap mangos
---
mode.go | 18 +++
deliver.go | 30 +++++
nng.go | 254 ++++++++++++++++++++++++++++++++++++++++++
3 files changed, 302 insertions(+), 0 deletions(-)
diff --git a/deliver.go b/deliver.go
new file mode 100644
index 0000000..4314c94
--- /dev/null
+++ b/deliver.go
@@ -0,0 +1,30 @@
+package deliver
+
+// Deliver define a interface how to use mangos
+type Deliver interface {
+
+ // Send send data
+ Send([]byte) error
+
+ // Recv recv data
+ Recv() ([]byte, error)
+}
+
+// NewProducer create producer args presentive for parameter with protocal, e.g. sub topic
+func NewProducer(m Mode, url string, args ...interface{}) Deliver {
+
+ if m > ModeStart && m < ModeNNG {
+ return NewNNGProducer(m, url, args)
+ }
+ return nil
+}
+
+// NewConsumer create consumer args presentive for parameter with protocal, e.g. sub topic
+func NewConsumer(m Mode, url string, args ...interface{}) Deliver {
+
+ if m > ModeStart && m < ModeNNG {
+ return NewNNGConsumer(m, url, args)
+ }
+
+ return nil
+}
diff --git a/mode.go b/mode.go
new file mode 100644
index 0000000..f64f576
--- /dev/null
+++ b/mode.go
@@ -0,0 +1,18 @@
+package deliver
+
+// Mode is the numeric abstraction to the various protocols or patterns
+// that Mangos supports.
+type Mode int
+
+// Constants for protocols.
+const (
+ ModeStart = iota
+ PushPull
+ PubSub
+ ReqRep
+ SurvResp
+ Bus
+ Pair
+ ModeNNG
+ ModeEnd
+)
diff --git a/nng.go b/nng.go
new file mode 100644
index 0000000..e9568cd
--- /dev/null
+++ b/nng.go
@@ -0,0 +1,254 @@
+package deliver
+
+import (
+ "errors"
+ "fmt"
+ "os"
+ "strings"
+
+ "nanomsg.org/go-mangos"
+ "nanomsg.org/go-mangos/protocol/bus"
+ "nanomsg.org/go-mangos/protocol/pair"
+ "nanomsg.org/go-mangos/protocol/pub"
+ "nanomsg.org/go-mangos/protocol/pull"
+ "nanomsg.org/go-mangos/protocol/push"
+ "nanomsg.org/go-mangos/protocol/rep"
+ "nanomsg.org/go-mangos/protocol/req"
+ "nanomsg.org/go-mangos/protocol/respondent"
+ "nanomsg.org/go-mangos/protocol/sub"
+ "nanomsg.org/go-mangos/protocol/surveyor"
+ "nanomsg.org/go-mangos/transport/all"
+)
+
+// NNG mangos wrap
+type NNG struct {
+ sock mangos.Socket
+}
+
+// Send impl interface Diliver
+func (n *NNG) Send(data []byte) error {
+ if n.sock == nil {
+ return errors.New("please init NNG first")
+ }
+
+ if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
+ msg := mangos.NewMessage(len(data))
+ msg.Body = data
+ return n.sock.SendMsg(msg)
+ }
+ return n.sock.Send(data)
+}
+
+// Recv impl interface Diliver
+func (n *NNG) Recv() ([]byte, error) {
+ if n.sock == nil {
+ return nil, errors.New("please init NNG first")
+ }
+ if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
+ msg, err := n.sock.RecvMsg()
+ return msg.Body, err
+ }
+ return n.sock.Recv()
+}
+
+// NewNNGProducer create from deliver Mode
+func NewNNGProducer(m Mode, url string, args ...interface{}) *NNG {
+
+ rmExistedIpcName(url)
+ if sock, err := newSocket(protoProducer(m)); err == nil {
+ if err = setSocketOptions(sock, args); err != nil {
+ return nil
+ }
+ if err = sock.Listen(url); err != nil {
+ sock.Close()
+ return nil
+ }
+ return &NNG{
+ sock,
+ }
+ }
+
+ return nil
+}
+
+// NewNNGConsumer create from deliver Mode
+func NewNNGConsumer(m Mode, url string, args ...interface{}) *NNG {
+
+ if sock, err := newSocket(protoConsumer(m)); err == nil {
+ if err = setSocketOptions(sock, args); err != nil {
+ return nil
+ }
+
+ if err = sock.Dial(url); err != nil {
+ sock.Close()
+ return nil
+ }
+
+ return &NNG{
+ sock,
+ }
+ }
+
+ return nil
+}
+
+// MaxRecvSize max recv size
+var MaxRecvSize = 33 * 1024 * 1024
+
+func defualtSocketOptions(sock mangos.Socket) error {
+ var err error
+ if err = sock.SetOption(mangos.OptionMaxRecvSize, MaxRecvSize); err != nil {
+ sock.Close()
+ return err
+ }
+ if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil {
+ sock.Close()
+ return err
+ }
+ if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil {
+ sock.Close()
+ return err
+ }
+ // if err = sock.SetOption(mangos.OptionNoDelay, true); err != nil {
+ // sock.Close()
+ // return err
+ // }
+ if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
+ sock.Close()
+ return err
+ }
+
+ return nil
+}
+
+func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
+
+ err := defualtSocketOptions(sock)
+ if err != nil {
+ return err
+ }
+ if sock.GetProtocol().Number() == mangos.ProtoSub {
+ for _, arg := range args {
+ switch arg.(type) {
+ case string:
+ err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string)))
+ default:
+ err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
+ }
+ }
+ }
+
+ return nil
+}
+
+func rmExistedIpcName(url string) {
+ s := strings.Split(url, "://")
+
+ if s[0] == "ipc" {
+ if _, err := os.Stat(s[1]); err == nil {
+ os.Remove(s[1])
+ }
+ }
+}
+
+// newSocket allocates a new Socket. The Socket is the handle used to
+// access the underlying library.
+func newSocket(p protocol) (mangos.Socket, error) {
+
+ var s mangos.Socket
+ var err error
+
+ switch p {
+ case PUB:
+ s, err = pub.NewSocket()
+ case SUB:
+ s, err = sub.NewSocket()
+ case PUSH:
+ s, err = push.NewSocket()
+ case PULL:
+ s, err = pull.NewSocket()
+ case REQ:
+ s, err = req.NewSocket()
+ case REP:
+ s, err = rep.NewSocket()
+ case SURVEYOR:
+ s, err = surveyor.NewSocket()
+ case RESPONDENT:
+ s, err = respondent.NewSocket()
+ case PAIR:
+ s, err = pair.NewSocket()
+ case BUS:
+ s, err = bus.NewSocket()
+ default:
+ err = mangos.ErrBadProto
+ }
+
+ if err != nil {
+ return nil, err
+ }
+
+ all.AddTransports(s)
+ // s.AddTransport(ipc.NewTransport())
+ // s.AddTransport(tcp.NewTransport())
+
+ return s, nil
+}
+
+func die(format string, v ...interface{}) {
+ fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
+ os.Exit(1)
+}
+
+// Protocol is the numeric abstraction to the various protocols or patterns
+// that Mangos supports.
+type protocol int
+
+// Constants for protocols.
+const (
+ PUSH = protocol(mangos.ProtoPush)
+ PULL = protocol(mangos.ProtoPull)
+ PUB = protocol(mangos.ProtoPub)
+ SUB = protocol(mangos.ProtoSub)
+ REQ = protocol(mangos.ProtoReq)
+ REP = protocol(mangos.ProtoRep)
+ SURVEYOR = protocol(mangos.ProtoSurveyor)
+ RESPONDENT = protocol(mangos.ProtoRespondent)
+ BUS = protocol(mangos.ProtoBus)
+ PAIR = protocol(mangos.ProtoPair)
+)
+
+func protoProducer(m Mode) protocol {
+ switch m {
+ case PushPull:
+ return PUSH
+ case PubSub:
+ return PUB
+ case ReqRep:
+ return REP
+ case SurvResp:
+ return SURVEYOR
+ case Bus:
+ return BUS
+ case Pair:
+ return PAIR
+ }
+ return PUSH
+}
+
+func protoConsumer(m Mode) protocol {
+ switch m {
+ case PushPull:
+ return PULL
+ case PubSub:
+ return SUB
+ case ReqRep:
+ return REQ
+ case SurvResp:
+ return RESPONDENT
+ case Bus:
+ return BUS
+ case Pair:
+ return PAIR
+ }
+ return PULL
+}
--
Gitblit v1.8.0