From 36766ab5b68ce7dfb39dff5d6d283ce5c7f4b346 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期五, 17 五月 2019 09:14:51 +0800
Subject: [PATCH] fix crash
---
nng.go | 137 ++++++++++++++++++++++++++-------------------
1 files changed, 78 insertions(+), 59 deletions(-)
diff --git a/nng.go b/nng.go
index 009ef8d..07f2fb5 100644
--- a/nng.go
+++ b/nng.go
@@ -1,7 +1,6 @@
package deliver
import (
- "errors"
"fmt"
"os"
"strings"
@@ -23,43 +22,53 @@
// NNG mangos wrap
type NNG struct {
- sock mangos.Socket
- raw bool
+ sock mangos.Socket
+ server bool
+ mode Mode
+ url string
+
+ arguments []interface{}
}
// Send impl interface Diliver
func (n *NNG) Send(data []byte) error {
+ var err error
if n.sock == nil {
- return errors.New("please init NNG first")
+ n.sock, err = n.makeNNG(true)
+ if err != nil {
+ fmt.Println("create nng producer error")
+ return err
+ }
}
if surveyorTime > 0 {
time.Sleep(time.Duration(surveyorTime*2) * time.Second)
}
- if n.raw {
- msg := mangos.NewMessage(len(data))
- msg.Body = data
- return n.sock.SendMsg(msg)
- }
+ msg := mangos.NewMessage(len(data))
+ msg.Body = data
+ return n.sock.SendMsg(msg)
- return n.sock.Send(data)
}
// Recv impl interface Diliver
func (n *NNG) Recv() ([]byte, error) {
+ var err error
+
if n.sock == nil {
- return nil, errors.New("please init NNG first")
- }
- if n.raw {
- var msg *mangos.Message
- var err error
- if msg, err = n.sock.RecvMsg(); err != nil {
+ n.sock, err = n.makeNNG(false)
+ if err != nil {
+ fmt.Println("create nng consumer error")
return nil, err
}
- return msg.Body, nil
}
- return n.sock.Recv()
+
+ var msg *mangos.Message
+ if msg, err = n.sock.RecvMsg(); err != nil {
+ return nil, err
+ }
+ return msg.Body, nil
+
}
// Close impl interface Deliver
@@ -73,48 +82,57 @@
func nngServer(m Mode, url string, args ...interface{}) *NNG {
rmExistedIpcName(url)
- if sock, err := newSocket(protoProducer(m)); err == nil {
- if err = setSocketOptions(sock, args); err != nil {
- sock.Close()
- sock = nil
- return nil
- }
- if err = sock.Listen(url); err != nil {
- sock.Close()
- sock = nil
- return nil
- }
- return &NNG{
- sock,
- true,
- }
- }
- return nil
+ return &NNG{
+ server: true,
+ mode: m,
+ url: url,
+ arguments: args,
+ }
}
func nngClient(m Mode, url string, args ...interface{}) *NNG {
- if sock, err := newSocket(protoConsumer(m)); err == nil {
- if err = setSocketOptions(sock, args); err != nil {
- sock.Close()
- sock = nil
- return nil
- }
-
- if err = sock.Dial(url); err != nil {
- sock.Close()
- sock = nil
- return nil
- }
-
- return &NNG{
- sock,
- true,
- }
+ return &NNG{
+ server: false,
+ mode: m,
+ url: url,
+ arguments: args,
}
- return nil
+}
+
+func proto(producer bool, m Mode) protocol {
+ if producer {
+ return protoProducer(m)
+ }
+ return protoConsumer(m)
+}
+
+func (n *NNG) makeNNG(producer bool) (mangos.Socket, error) {
+
+ var sock mangos.Socket
+ var err error
+ if sock, err = newSocket(proto(producer, n.mode)); err != nil {
+ return nil, err
+ }
+
+ if err = setSocketOptions(sock, n.arguments...); err != nil {
+ sock.Close()
+ sock = nil
+ }
+ if n.server {
+ if err = sock.Listen(n.url); err != nil {
+ sock.Close()
+ sock = nil
+ }
+ } else {
+ if err = sock.Dial(n.url); err != nil {
+ sock.Close()
+ sock = nil
+ }
+ }
+ return sock, err
}
// maxRecvSize max recv size
@@ -130,8 +148,8 @@
options[mangos.OptionMaxRecvSize] = maxRecvSize
options[mangos.OptionWriteQLen] = 0
options[mangos.OptionReadQLen] = 0
- options[mangos.OptionRecvDeadline] = time.Second
- options[mangos.OptionSendDeadline] = time.Second
+ // options[mangos.OptionRecvDeadline] = time.Second
+ // options[mangos.OptionSendDeadline] = time.Second
options[mangos.OptionRaw] = true
return options
@@ -143,14 +161,15 @@
switch sock.GetProtocol().Number() {
case mangos.ProtoSub:
+ topic := ""
for _, arg := range args {
switch arg.(type) {
case string:
- options[mangos.OptionSubscribe] = []byte(arg.(string))
+ topic = arg.(string)
default:
- options[mangos.OptionSubscribe] = []byte("")
}
}
+ options[mangos.OptionSubscribe] = []byte(topic)
case mangos.ProtoSurveyor:
for _, arg := range args {
switch arg.(type) {
@@ -162,10 +181,10 @@
}
default:
}
- options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second
}
+ options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second
+
default:
- fmt.Println("no additional args")
}
for k, v := range options {
--
Gitblit v1.8.0