From b3a335f79fd9a6ad91705e2ea293115681484d69 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期五, 31 五月 2019 17:36:27 +0800
Subject: [PATCH] add mode
---
nng.go | 161 +++++++++++++++++------------------------------------
1 files changed, 53 insertions(+), 108 deletions(-)
diff --git a/nng.go b/nng.go
index 07f2fb5..fa3e409 100644
--- a/nng.go
+++ b/nng.go
@@ -1,6 +1,7 @@
package deliver
import (
+ "errors"
"fmt"
"os"
"strings"
@@ -22,21 +23,24 @@
// NNG mangos wrap
type NNG struct {
- sock mangos.Socket
- server bool
- mode Mode
- url string
+ sock mangos.Socket
+ typ td
+ mode Mode
+ url string
arguments []interface{}
}
// Send impl interface Diliver
func (n *NNG) Send(data []byte) error {
+ if n == nil {
+ return errors.New("please init NNG first")
+ }
var err error
if n.sock == nil {
- n.sock, err = n.makeNNG(true)
+ n.sock, err = n.makeNNG(agent)
if err != nil {
- fmt.Println("create nng producer error")
+ fmt.Println("create nng sender error")
return err
}
}
@@ -53,12 +57,16 @@
// Recv impl interface Diliver
func (n *NNG) Recv() ([]byte, error) {
+ if n == nil {
+ return nil, errors.New("please init NNG first")
+ }
+
var err error
if n.sock == nil {
- n.sock, err = n.makeNNG(false)
+ n.sock, err = n.makeNNG(coactee)
if err != nil {
- fmt.Println("create nng consumer error")
+ fmt.Println("create nng reciever error")
return nil, err
}
}
@@ -73,7 +81,7 @@
// Close impl interface Deliver
func (n *NNG) Close() {
- if n.sock != nil {
+ if n != nil && n.sock != nil {
n.sock.Close()
n.sock = nil
}
@@ -84,7 +92,7 @@
rmExistedIpcName(url)
return &NNG{
- server: true,
+ typ: agent,
mode: m,
url: url,
arguments: args,
@@ -94,7 +102,7 @@
func nngClient(m Mode, url string, args ...interface{}) *NNG {
return &NNG{
- server: false,
+ typ: coactee,
mode: m,
url: url,
arguments: args,
@@ -102,97 +110,30 @@
}
-func proto(producer bool, m Mode) protocol {
- if producer {
- return protoProducer(m)
+func proto(typ td, m Mode) protocol {
+ if typ == agent {
+ return protoAgent(m)
+ } else if typ == coactee {
+ return protoCoactee(m)
}
- return protoConsumer(m)
+ return NONE
}
-func (n *NNG) makeNNG(producer bool) (mangos.Socket, error) {
+func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
var sock mangos.Socket
var err error
- if sock, err = newSocket(proto(producer, n.mode)); err != nil {
- return nil, err
- }
- if err = setSocketOptions(sock, n.arguments...); err != nil {
- sock.Close()
- sock = nil
- }
- if n.server {
- if err = sock.Listen(n.url); err != nil {
- sock.Close()
- sock = nil
- }
- } else {
- if err = sock.Dial(n.url); err != nil {
- sock.Close()
- sock = nil
- }
- }
- return sock, err
-}
-
-// maxRecvSize max recv size
-var (
- maxRecvSize = 33 * 1024 * 1024
- surveyorTime = -1
-)
-
-func defualtSocketOptions() map[string]interface{} {
-
- options := make(map[string]interface{})
-
- options[mangos.OptionMaxRecvSize] = maxRecvSize
- options[mangos.OptionWriteQLen] = 0
- options[mangos.OptionReadQLen] = 0
- // options[mangos.OptionRecvDeadline] = time.Second
- // options[mangos.OptionSendDeadline] = time.Second
- options[mangos.OptionRaw] = true
-
- return options
-}
-
-func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
-
- options := defualtSocketOptions()
-
- switch sock.GetProtocol().Number() {
- case mangos.ProtoSub:
- topic := ""
- for _, arg := range args {
- switch arg.(type) {
- case string:
- topic = arg.(string)
- default:
- }
- }
- options[mangos.OptionSubscribe] = []byte(topic)
- case mangos.ProtoSurveyor:
- for _, arg := range args {
- switch arg.(type) {
- case int:
- if arg.(int) < 2 {
- surveyorTime = 1
- } else {
- surveyorTime = arg.(int) / 2
- }
- default:
- }
- }
- options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second
-
+ switch n.mode {
+ case Bus:
+ sock, err = n.busMakeNNG(typ)
+ case ReqRep:
+ sock, err = n.rrMakeNNG(typ)
default:
+ sock, err = n.ppMakeNNG(typ)
}
- for k, v := range options {
- if err := sock.SetOption(k, v); err != nil {
- return err
- }
- }
- return nil
+ return sock, err
}
func rmExistedIpcName(url string) {
@@ -209,6 +150,9 @@
// access the underlying library.
func newSocket(p protocol) (mangos.Socket, error) {
+ if p == NONE {
+ return nil, errors.New("new socket protocol none")
+ }
var s mangos.Socket
var err error
@@ -257,6 +201,7 @@
// Constants for protocols.
const (
+ NONE = -1
PUSH = protocol(mangos.ProtoPush)
PULL = protocol(mangos.ProtoPull)
PUB = protocol(mangos.ProtoPub)
@@ -269,38 +214,38 @@
PAIR = protocol(mangos.ProtoPair)
)
-func protoProducer(m Mode) protocol {
+func protoAgent(m Mode) protocol {
switch m {
case PushPull:
return PUSH
case PubSub:
return PUB
- case ReqRep:
- return REP
- case SurvResp:
- return SURVEYOR
- case Bus:
- return BUS
case Pair:
return PAIR
+ case SurvResp:
+ return SURVEYOR
+ case ReqRep:
+ return REQ
+ case Bus:
+ return BUS
}
- return PUSH
+ return NONE
}
-func protoConsumer(m Mode) protocol {
+func protoCoactee(m Mode) protocol {
switch m {
case PushPull:
return PULL
case PubSub:
return SUB
- case ReqRep:
- return REQ
- case SurvResp:
- return RESPONDENT
- case Bus:
- return BUS
case Pair:
return PAIR
+ case SurvResp:
+ return RESPONDENT
+ case ReqRep:
+ return REP
+ case Bus:
+ return BUS
}
- return PULL
+ return NONE
}
--
Gitblit v1.8.0