From b3a335f79fd9a6ad91705e2ea293115681484d69 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期五, 31 五月 2019 17:36:27 +0800
Subject: [PATCH] add mode
---
nng.go | 205 +++++++++++++++++++++++++--------------------------
1 files changed, 101 insertions(+), 104 deletions(-)
diff --git a/nng.go b/nng.go
index 84d3699..fa3e409 100644
--- a/nng.go
+++ b/nng.go
@@ -5,6 +5,7 @@
"fmt"
"os"
"strings"
+ "time"
"nanomsg.org/go-mangos"
"nanomsg.org/go-mangos/protocol/bus"
@@ -23,122 +24,116 @@
// NNG mangos wrap
type NNG struct {
sock mangos.Socket
+ typ td
+ mode Mode
+ url string
+
+ arguments []interface{}
}
// Send impl interface Diliver
func (n *NNG) Send(data []byte) error {
- if n.sock == nil {
+ if n == nil {
return errors.New("please init NNG first")
}
-
- if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
- msg := mangos.NewMessage(len(data))
- msg.Body = data
- return n.sock.SendMsg(msg)
+ var err error
+ if n.sock == nil {
+ n.sock, err = n.makeNNG(agent)
+ if err != nil {
+ fmt.Println("create nng sender error")
+ return err
+ }
}
- return n.sock.Send(data)
+
+ if surveyorTime > 0 {
+ time.Sleep(time.Duration(surveyorTime*2) * time.Second)
+ }
+
+ msg := mangos.NewMessage(len(data))
+ msg.Body = data
+ return n.sock.SendMsg(msg)
+
}
// Recv impl interface Diliver
func (n *NNG) Recv() ([]byte, error) {
- if n.sock == nil {
+ if n == nil {
return nil, errors.New("please init NNG first")
}
- if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
- msg, err := n.sock.RecvMsg()
- return msg.Body, err
+
+ var err error
+
+ if n.sock == nil {
+ n.sock, err = n.makeNNG(coactee)
+ if err != nil {
+ fmt.Println("create nng reciever error")
+ return nil, err
+ }
}
- return n.sock.Recv()
+
+ var msg *mangos.Message
+ if msg, err = n.sock.RecvMsg(); err != nil {
+ return nil, err
+ }
+ return msg.Body, nil
+
}
-// nngProducer create from deliver Mode
-func nngProducer(m Mode, url string, args ...interface{}) *NNG {
+// Close impl interface Deliver
+func (n *NNG) Close() {
+ if n != nil && n.sock != nil {
+ n.sock.Close()
+ n.sock = nil
+ }
+}
+
+func nngServer(m Mode, url string, args ...interface{}) *NNG {
rmExistedIpcName(url)
- if sock, err := newSocket(protoProducer(m)); err == nil {
- if err = setSocketOptions(sock, args); err != nil {
- return nil
- }
- if err = sock.Listen(url); err != nil {
- sock.Close()
- return nil
- }
- return &NNG{
- sock,
- }
- }
- return nil
+ return &NNG{
+ typ: agent,
+ mode: m,
+ url: url,
+ arguments: args,
+ }
}
-// nngConsumer create from deliver Mode
-func nngConsumer(m Mode, url string, args ...interface{}) *NNG {
+func nngClient(m Mode, url string, args ...interface{}) *NNG {
- if sock, err := newSocket(protoConsumer(m)); err == nil {
- if err = setSocketOptions(sock, args); err != nil {
- return nil
- }
-
- if err = sock.Dial(url); err != nil {
- sock.Close()
- return nil
- }
-
- return &NNG{
- sock,
- }
+ return &NNG{
+ typ: coactee,
+ mode: m,
+ url: url,
+ arguments: args,
}
- return nil
}
-// MaxRecvSize max recv size
-var MaxRecvSize = 33 * 1024 * 1024
+func proto(typ td, m Mode) protocol {
+ if typ == agent {
+ return protoAgent(m)
+ } else if typ == coactee {
+ return protoCoactee(m)
+ }
+ return NONE
+}
-func defualtSocketOptions(sock mangos.Socket) error {
+func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
+
+ var sock mangos.Socket
var err error
- if err = sock.SetOption(mangos.OptionMaxRecvSize, MaxRecvSize); err != nil {
- sock.Close()
- return err
- }
- if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil {
- sock.Close()
- return err
- }
- if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil {
- sock.Close()
- return err
- }
- // if err = sock.SetOption(mangos.OptionNoDelay, true); err != nil {
- // sock.Close()
- // return err
- // }
- if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
- sock.Close()
- return err
+
+ switch n.mode {
+ case Bus:
+ sock, err = n.busMakeNNG(typ)
+ case ReqRep:
+ sock, err = n.rrMakeNNG(typ)
+ default:
+ sock, err = n.ppMakeNNG(typ)
}
- return nil
-}
-
-func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
-
- err := defualtSocketOptions(sock)
- if err != nil {
- return err
- }
- if sock.GetProtocol().Number() == mangos.ProtoSub {
- for _, arg := range args {
- switch arg.(type) {
- case string:
- err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string)))
- default:
- err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
- }
- }
- }
-
- return nil
+ return sock, err
}
func rmExistedIpcName(url string) {
@@ -155,6 +150,9 @@
// access the underlying library.
func newSocket(p protocol) (mangos.Socket, error) {
+ if p == NONE {
+ return nil, errors.New("new socket protocol none")
+ }
var s mangos.Socket
var err error
@@ -188,8 +186,6 @@
}
all.AddTransports(s)
- // s.AddTransport(ipc.NewTransport())
- // s.AddTransport(tcp.NewTransport())
return s, nil
}
@@ -205,6 +201,7 @@
// Constants for protocols.
const (
+ NONE = -1
PUSH = protocol(mangos.ProtoPush)
PULL = protocol(mangos.ProtoPull)
PUB = protocol(mangos.ProtoPub)
@@ -217,38 +214,38 @@
PAIR = protocol(mangos.ProtoPair)
)
-func protoProducer(m Mode) protocol {
+func protoAgent(m Mode) protocol {
switch m {
case PushPull:
return PUSH
case PubSub:
return PUB
- case ReqRep:
- return REP
- case SurvResp:
- return SURVEYOR
- case Bus:
- return BUS
case Pair:
return PAIR
+ case SurvResp:
+ return SURVEYOR
+ case ReqRep:
+ return REQ
+ case Bus:
+ return BUS
}
- return PUSH
+ return NONE
}
-func protoConsumer(m Mode) protocol {
+func protoCoactee(m Mode) protocol {
switch m {
case PushPull:
return PULL
case PubSub:
return SUB
- case ReqRep:
- return REQ
- case SurvResp:
- return RESPONDENT
- case Bus:
- return BUS
case Pair:
return PAIR
+ case SurvResp:
+ return RESPONDENT
+ case ReqRep:
+ return REP
+ case Bus:
+ return BUS
}
- return PULL
+ return NONE
}
--
Gitblit v1.8.0