From b3a335f79fd9a6ad91705e2ea293115681484d69 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期五, 31 五月 2019 17:36:27 +0800
Subject: [PATCH] add mode
---
nng.go | 223 ++++++++++++++++++++++++-------------------------------
1 files changed, 97 insertions(+), 126 deletions(-)
diff --git a/nng.go b/nng.go
index 00a8dc2..fa3e409 100644
--- a/nng.go
+++ b/nng.go
@@ -24,147 +24,116 @@
// NNG mangos wrap
type NNG struct {
sock mangos.Socket
+ typ td
+ mode Mode
+ url string
+
+ arguments []interface{}
}
// Send impl interface Diliver
func (n *NNG) Send(data []byte) error {
- if n.sock == nil {
+ if n == nil {
return errors.New("please init NNG first")
}
-
- switch n.sock.GetProtocol().Number() {
- case mangos.ProtoSurveyor:
- time.Sleep(surveyorTime * 2)
- default:
- }
- if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
- msg := mangos.NewMessage(len(data))
- msg.Body = data
- return n.sock.SendMsg(msg)
+ var err error
+ if n.sock == nil {
+ n.sock, err = n.makeNNG(agent)
+ if err != nil {
+ fmt.Println("create nng sender error")
+ return err
+ }
}
- return n.sock.Send(data)
+ if surveyorTime > 0 {
+ time.Sleep(time.Duration(surveyorTime*2) * time.Second)
+ }
+
+ msg := mangos.NewMessage(len(data))
+ msg.Body = data
+ return n.sock.SendMsg(msg)
+
}
// Recv impl interface Diliver
func (n *NNG) Recv() ([]byte, error) {
- if n.sock == nil {
+ if n == nil {
return nil, errors.New("please init NNG first")
}
- if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
- msg, err := n.sock.RecvMsg()
- return msg.Body, err
+
+ var err error
+
+ if n.sock == nil {
+ n.sock, err = n.makeNNG(coactee)
+ if err != nil {
+ fmt.Println("create nng reciever error")
+ return nil, err
+ }
}
- return n.sock.Recv()
+
+ var msg *mangos.Message
+ if msg, err = n.sock.RecvMsg(); err != nil {
+ return nil, err
+ }
+ return msg.Body, nil
+
}
-// nngProducer create from deliver Mode
-func nngProducer(m Mode, url string, args ...interface{}) *NNG {
+// Close impl interface Deliver
+func (n *NNG) Close() {
+ if n != nil && n.sock != nil {
+ n.sock.Close()
+ n.sock = nil
+ }
+}
+
+func nngServer(m Mode, url string, args ...interface{}) *NNG {
rmExistedIpcName(url)
- if sock, err := newSocket(protoProducer(m)); err == nil {
- if err = setSocketOptions(sock, args); err != nil {
- return nil
- }
- if err = sock.Listen(url); err != nil {
- sock.Close()
- return nil
- }
- return &NNG{
- sock,
- }
- }
- return nil
+ return &NNG{
+ typ: agent,
+ mode: m,
+ url: url,
+ arguments: args,
+ }
}
-// nngConsumer create from deliver Mode
-func nngConsumer(m Mode, url string, args ...interface{}) *NNG {
+func nngClient(m Mode, url string, args ...interface{}) *NNG {
- if sock, err := newSocket(protoConsumer(m)); err == nil {
- if err = setSocketOptions(sock, args); err != nil {
- return nil
- }
-
- if err = sock.Dial(url); err != nil {
- sock.Close()
- return nil
- }
-
- return &NNG{
- sock,
- }
+ return &NNG{
+ typ: coactee,
+ mode: m,
+ url: url,
+ arguments: args,
}
- return nil
}
-// maxRecvSize max recv size
-var (
- maxRecvSize = 33 * 1024 * 1024
- surveyorTime = time.Second / 2
-)
+func proto(typ td, m Mode) protocol {
+ if typ == agent {
+ return protoAgent(m)
+ } else if typ == coactee {
+ return protoCoactee(m)
+ }
+ return NONE
+}
-func defualtSocketOptions(sock mangos.Socket) error {
+func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
+
+ var sock mangos.Socket
var err error
- if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil {
- sock.Close()
- return err
- }
- if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil {
- sock.Close()
- return err
- }
- if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil {
- sock.Close()
- return err
- }
- // if err = sock.SetOption(mangos.OptionRecvDeadline, time.Second); err != nil {
- // sock.Close()
- // return err
- // }
- // if err = sock.SetOption(mangos.OptionSendDeadline, time.Second); err != nil {
- // sock.Close()
- // return err
- // }
- if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
- sock.Close()
- return err
- }
- return nil
-}
-
-func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
-
- err := defualtSocketOptions(sock)
- if err != nil {
- return err
- }
- switch sock.GetProtocol().Number() {
- case mangos.ProtoSub:
- for _, arg := range args {
- switch arg.(type) {
- case string:
- err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string)))
- default:
- err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
- }
- }
- case mangos.ProtoSurveyor:
- for _, arg := range args {
- switch arg.(type) {
- case int:
- surveyorTime = time.Duration(arg.(int)/2) * time.Second
- default:
- }
- err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime)
- }
+ switch n.mode {
+ case Bus:
+ sock, err = n.busMakeNNG(typ)
+ case ReqRep:
+ sock, err = n.rrMakeNNG(typ)
default:
- fmt.Println("no additional args")
+ sock, err = n.ppMakeNNG(typ)
}
- return nil
+ return sock, err
}
func rmExistedIpcName(url string) {
@@ -181,6 +150,9 @@
// access the underlying library.
func newSocket(p protocol) (mangos.Socket, error) {
+ if p == NONE {
+ return nil, errors.New("new socket protocol none")
+ }
var s mangos.Socket
var err error
@@ -214,8 +186,6 @@
}
all.AddTransports(s)
- // s.AddTransport(ipc.NewTransport())
- // s.AddTransport(tcp.NewTransport())
return s, nil
}
@@ -231,6 +201,7 @@
// Constants for protocols.
const (
+ NONE = -1
PUSH = protocol(mangos.ProtoPush)
PULL = protocol(mangos.ProtoPull)
PUB = protocol(mangos.ProtoPub)
@@ -243,38 +214,38 @@
PAIR = protocol(mangos.ProtoPair)
)
-func protoProducer(m Mode) protocol {
+func protoAgent(m Mode) protocol {
switch m {
case PushPull:
return PUSH
case PubSub:
return PUB
- case ReqRep:
- return REP
- case SurvResp:
- return SURVEYOR
- case Bus:
- return BUS
case Pair:
return PAIR
+ case SurvResp:
+ return SURVEYOR
+ case ReqRep:
+ return REQ
+ case Bus:
+ return BUS
}
- return PUSH
+ return NONE
}
-func protoConsumer(m Mode) protocol {
+func protoCoactee(m Mode) protocol {
switch m {
case PushPull:
return PULL
case PubSub:
return SUB
- case ReqRep:
- return REQ
- case SurvResp:
- return RESPONDENT
- case Bus:
- return BUS
case Pair:
return PAIR
+ case SurvResp:
+ return RESPONDENT
+ case ReqRep:
+ return REP
+ case Bus:
+ return BUS
}
- return PULL
+ return NONE
}
--
Gitblit v1.8.0