From 36766ab5b68ce7dfb39dff5d6d283ce5c7f4b346 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期五, 17 五月 2019 09:14:51 +0800
Subject: [PATCH] fix crash

---
 nng.go |  202 ++++++++++++++++++++++++++++----------------------
 1 files changed, 114 insertions(+), 88 deletions(-)

diff --git a/nng.go b/nng.go
index 00a8dc2..07f2fb5 100644
--- a/nng.go
+++ b/nng.go
@@ -1,7 +1,6 @@
 package deliver
 
 import (
-	"errors"
 	"fmt"
 	"os"
 	"strings"
@@ -23,147 +22,176 @@
 
 // NNG mangos wrap
 type NNG struct {
-	sock mangos.Socket
+	sock   mangos.Socket
+	server bool
+	mode   Mode
+	url    string
+
+	arguments []interface{}
 }
 
 // Send impl interface Diliver
 func (n *NNG) Send(data []byte) error {
+	var err error
 	if n.sock == nil {
-		return errors.New("please init NNG first")
+		n.sock, err = n.makeNNG(true)
+		if err != nil {
+			fmt.Println("create nng producer error")
+			return err
+		}
 	}
 
-	switch n.sock.GetProtocol().Number() {
-	case mangos.ProtoSurveyor:
-		time.Sleep(surveyorTime * 2)
-	default:
-	}
-	if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
-		msg := mangos.NewMessage(len(data))
-		msg.Body = data
-		return n.sock.SendMsg(msg)
+	if surveyorTime > 0 {
+		time.Sleep(time.Duration(surveyorTime*2) * time.Second)
 	}
 
-	return n.sock.Send(data)
+	msg := mangos.NewMessage(len(data))
+	msg.Body = data
+	return n.sock.SendMsg(msg)
+
 }
 
 // Recv impl interface Diliver
 func (n *NNG) Recv() ([]byte, error) {
+	var err error
+
 	if n.sock == nil {
-		return nil, errors.New("please init NNG first")
+		n.sock, err = n.makeNNG(false)
+		if err != nil {
+			fmt.Println("create nng consumer error")
+			return nil, err
+		}
 	}
-	if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
-		msg, err := n.sock.RecvMsg()
-		return msg.Body, err
+
+	var msg *mangos.Message
+	if msg, err = n.sock.RecvMsg(); err != nil {
+		return nil, err
 	}
-	return n.sock.Recv()
+	return msg.Body, nil
+
 }
 
-// nngProducer create from deliver Mode
-func nngProducer(m Mode, url string, args ...interface{}) *NNG {
+// Close impl interface Deliver
+func (n *NNG) Close() {
+	if n.sock != nil {
+		n.sock.Close()
+		n.sock = nil
+	}
+}
+
+func nngServer(m Mode, url string, args ...interface{}) *NNG {
 
 	rmExistedIpcName(url)
-	if sock, err := newSocket(protoProducer(m)); err == nil {
-		if err = setSocketOptions(sock, args); err != nil {
-			return nil
-		}
-		if err = sock.Listen(url); err != nil {
-			sock.Close()
-			return nil
-		}
-		return &NNG{
-			sock,
-		}
-	}
 
-	return nil
+	return &NNG{
+		server:    true,
+		mode:      m,
+		url:       url,
+		arguments: args,
+	}
 }
 
-// nngConsumer create from deliver Mode
-func nngConsumer(m Mode, url string, args ...interface{}) *NNG {
+func nngClient(m Mode, url string, args ...interface{}) *NNG {
 
-	if sock, err := newSocket(protoConsumer(m)); err == nil {
-		if err = setSocketOptions(sock, args); err != nil {
-			return nil
-		}
-
-		if err = sock.Dial(url); err != nil {
-			sock.Close()
-			return nil
-		}
-
-		return &NNG{
-			sock,
-		}
+	return &NNG{
+		server:    false,
+		mode:      m,
+		url:       url,
+		arguments: args,
 	}
 
-	return nil
+}
+
+func proto(producer bool, m Mode) protocol {
+	if producer {
+		return protoProducer(m)
+	}
+	return protoConsumer(m)
+}
+
+func (n *NNG) makeNNG(producer bool) (mangos.Socket, error) {
+
+	var sock mangos.Socket
+	var err error
+	if sock, err = newSocket(proto(producer, n.mode)); err != nil {
+		return nil, err
+	}
+
+	if err = setSocketOptions(sock, n.arguments...); err != nil {
+		sock.Close()
+		sock = nil
+	}
+	if n.server {
+		if err = sock.Listen(n.url); err != nil {
+			sock.Close()
+			sock = nil
+		}
+	} else {
+		if err = sock.Dial(n.url); err != nil {
+			sock.Close()
+			sock = nil
+		}
+	}
+	return sock, err
 }
 
 // maxRecvSize max recv size
 var (
 	maxRecvSize  = 33 * 1024 * 1024
-	surveyorTime = time.Second / 2
+	surveyorTime = -1
 )
 
-func defualtSocketOptions(sock mangos.Socket) error {
-	var err error
-	if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil {
-		sock.Close()
-		return err
-	}
-	if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil {
-		sock.Close()
-		return err
-	}
-	if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil {
-		sock.Close()
-		return err
-	}
-	// if err = sock.SetOption(mangos.OptionRecvDeadline, time.Second); err != nil {
-	// 	sock.Close()
-	// 	return err
-	// }
-	// if err = sock.SetOption(mangos.OptionSendDeadline, time.Second); err != nil {
-	// 	sock.Close()
-	// 	return err
-	// }
-	if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
-		sock.Close()
-		return err
-	}
+func defualtSocketOptions() map[string]interface{} {
 
-	return nil
+	options := make(map[string]interface{})
+
+	options[mangos.OptionMaxRecvSize] = maxRecvSize
+	options[mangos.OptionWriteQLen] = 0
+	options[mangos.OptionReadQLen] = 0
+	// options[mangos.OptionRecvDeadline] = time.Second
+	// options[mangos.OptionSendDeadline] = time.Second
+	options[mangos.OptionRaw] = true
+
+	return options
 }
 
 func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
 
-	err := defualtSocketOptions(sock)
-	if err != nil {
-		return err
-	}
+	options := defualtSocketOptions()
+
 	switch sock.GetProtocol().Number() {
 	case mangos.ProtoSub:
+		topic := ""
 		for _, arg := range args {
 			switch arg.(type) {
 			case string:
-				err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string)))
+				topic = arg.(string)
 			default:
-				err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
 			}
 		}
+		options[mangos.OptionSubscribe] = []byte(topic)
 	case mangos.ProtoSurveyor:
 		for _, arg := range args {
 			switch arg.(type) {
 			case int:
-				surveyorTime = time.Duration(arg.(int)/2) * time.Second
+				if arg.(int) < 2 {
+					surveyorTime = 1
+				} else {
+					surveyorTime = arg.(int) / 2
+				}
 			default:
 			}
-			err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime)
 		}
+		options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second
+
 	default:
-		fmt.Println("no additional args")
 	}
 
+	for k, v := range options {
+		if err := sock.SetOption(k, v); err != nil {
+			return err
+		}
+	}
 	return nil
 }
 
@@ -214,8 +242,6 @@
 	}
 
 	all.AddTransports(s)
-	// s.AddTransport(ipc.NewTransport())
-	// s.AddTransport(tcp.NewTransport())
 
 	return s, nil
 }

--
Gitblit v1.8.0