From 8e158e611ca6a3663e383d5a9b14d14eaf897736 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期四, 16 五月 2019 16:02:02 +0800
Subject: [PATCH] change deliver interface name
---
deliver.go | 12 +++---
nng.go | 66 +++++++++++++--------------------
2 files changed, 32 insertions(+), 46 deletions(-)
diff --git a/deliver.go b/deliver.go
index 21d6ea9..9b71902 100644
--- a/deliver.go
+++ b/deliver.go
@@ -13,20 +13,20 @@
Close()
}
-// NewProducer create producer args presentive for parameter with protocal, e.g. sub topic
-func NewProducer(m Mode, url string, args ...interface{}) Deliver {
+// NewListener create listener args presentive for parameter with protocal, e.g. sub topic
+func NewListener(m Mode, url string, args ...interface{}) Deliver {
if m > ModeStart && m < ModeNNG {
- return nngProducer(m, url, args...)
+ return nngListener(m, url, args...)
}
return nil
}
-// NewConsumer create consumer args presentive for parameter with protocal, e.g. sub topic
-func NewConsumer(m Mode, url string, args ...interface{}) Deliver {
+// NewDialer create dialer args presentive for parameter with protocal, e.g. sub topic
+func NewDialer(m Mode, url string, args ...interface{}) Deliver {
if m > ModeStart && m < ModeNNG {
- return nngConsumer(m, url, args...)
+ return nngDialer(m, url, args...)
}
return nil
diff --git a/nng.go b/nng.go
index 6cf8b52..242409a 100644
--- a/nng.go
+++ b/nng.go
@@ -67,11 +67,11 @@
func (n *NNG) Close() {
if n.sock != nil {
n.sock.Close()
+ n.sock = nil
}
}
-// nngProducer create from deliver Mode
-func nngProducer(m Mode, url string, args ...interface{}) *NNG {
+func nngListener(m Mode, url string, args ...interface{}) *NNG {
rmExistedIpcName(url)
if sock, err := newSocket(protoProducer(m)); err == nil {
@@ -80,6 +80,7 @@
}
if err = sock.Listen(url); err != nil {
sock.Close()
+ sock = nil
return nil
}
return &NNG{
@@ -91,8 +92,7 @@
return nil
}
-// nngConsumer create from deliver Mode
-func nngConsumer(m Mode, url string, args ...interface{}) *NNG {
+func nngDialer(m Mode, url string, args ...interface{}) *NNG {
if sock, err := newSocket(protoConsumer(m)); err == nil {
if err = setSocketOptions(sock, args); err != nil {
@@ -101,6 +101,7 @@
if err = sock.Dial(url); err != nil {
sock.Close()
+ sock = nil
return nil
}
@@ -119,50 +120,32 @@
surveyorTime = time.Second / 2
)
-func defualtSocketOptions(sock mangos.Socket) error {
- var err error
- if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil {
- sock.Close()
- return err
- }
- if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil {
- sock.Close()
- return err
- }
- if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil {
- sock.Close()
- return err
- }
- if err = sock.SetOption(mangos.OptionRecvDeadline, time.Second); err != nil {
- sock.Close()
- return err
- }
- if err = sock.SetOption(mangos.OptionSendDeadline, time.Second); err != nil {
- sock.Close()
- return err
- }
- if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
- sock.Close()
- return err
- }
+func defualtSocketOptions() map[string]interface{} {
- return nil
+ options := make(map[string]interface{})
+
+ options[mangos.OptionMaxRecvSize] = maxRecvSize
+ options[mangos.OptionWriteQLen] = 0
+ options[mangos.OptionReadQLen] = 0
+ options[mangos.OptionRecvDeadline] = time.Second
+ options[mangos.OptionSendDeadline] = time.Second
+ options[mangos.OptionRaw] = true
+
+ return options
}
func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
- err := defualtSocketOptions(sock)
- if err != nil {
- return err
- }
+ options := defualtSocketOptions()
+
switch sock.GetProtocol().Number() {
case mangos.ProtoSub:
for _, arg := range args {
switch arg.(type) {
case string:
- err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string)))
+ options[mangos.OptionSubscribe] = []byte(arg.(string))
default:
- err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
+ options[mangos.OptionSubscribe] = []byte("")
}
}
case mangos.ProtoSurveyor:
@@ -172,12 +155,17 @@
surveyorTime = time.Duration(arg.(int)/2) * time.Second
default:
}
- err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime)
+ options[mangos.OptionSurveyTime] = surveyorTime
}
default:
fmt.Println("no additional args")
}
+ for k, v := range options {
+ if err := sock.SetOption(k, v); err != nil {
+ return err
+ }
+ }
return nil
}
@@ -228,8 +216,6 @@
}
all.AddTransports(s)
- // s.AddTransport(ipc.NewTransport())
- // s.AddTransport(tcp.NewTransport())
return s, nil
}
--
Gitblit v1.8.0