zhangmeng
2019-05-17 36766ab5b68ce7dfb39dff5d6d283ce5c7f4b346
nng.go
@@ -1,7 +1,6 @@
package deliver
import (
   "errors"
   "fmt"
   "os"
   "strings"
@@ -23,43 +22,53 @@
// NNG mangos wrap
type NNG struct {
   sock mangos.Socket
   raw  bool
   sock   mangos.Socket
   server bool
   mode   Mode
   url    string
   arguments []interface{}
}
// Send impl interface Diliver
func (n *NNG) Send(data []byte) error {
   var err error
   if n.sock == nil {
      return errors.New("please init NNG first")
      n.sock, err = n.makeNNG(true)
      if err != nil {
         fmt.Println("create nng producer error")
         return err
      }
   }
   if surveyorTime > 0 {
      time.Sleep(time.Duration(surveyorTime*2) * time.Second)
   }
   if n.raw {
      msg := mangos.NewMessage(len(data))
      msg.Body = data
      return n.sock.SendMsg(msg)
   }
   msg := mangos.NewMessage(len(data))
   msg.Body = data
   return n.sock.SendMsg(msg)
   return n.sock.Send(data)
}
// Recv impl interface Diliver
func (n *NNG) Recv() ([]byte, error) {
   var err error
   if n.sock == nil {
      return nil, errors.New("please init NNG first")
   }
   if n.raw {
      var msg *mangos.Message
      var err error
      if msg, err = n.sock.RecvMsg(); err != nil {
      n.sock, err = n.makeNNG(false)
      if err != nil {
         fmt.Println("create nng consumer error")
         return nil, err
      }
      return msg.Body, nil
   }
   return n.sock.Recv()
   var msg *mangos.Message
   if msg, err = n.sock.RecvMsg(); err != nil {
      return nil, err
   }
   return msg.Body, nil
}
// Close impl interface Deliver
@@ -73,48 +82,57 @@
func nngServer(m Mode, url string, args ...interface{}) *NNG {
   rmExistedIpcName(url)
   if sock, err := newSocket(protoProducer(m)); err == nil {
      if err = setSocketOptions(sock, args); err != nil {
         sock.Close()
         sock = nil
         return nil
      }
      if err = sock.Listen(url); err != nil {
         sock.Close()
         sock = nil
         return nil
      }
      return &NNG{
         sock,
         true,
      }
   }
   return nil
   return &NNG{
      server:    true,
      mode:      m,
      url:       url,
      arguments: args,
   }
}
func nngClient(m Mode, url string, args ...interface{}) *NNG {
   if sock, err := newSocket(protoConsumer(m)); err == nil {
      if err = setSocketOptions(sock, args); err != nil {
         sock.Close()
         sock = nil
         return nil
      }
      if err = sock.Dial(url); err != nil {
         sock.Close()
         sock = nil
         return nil
      }
      return &NNG{
         sock,
         true,
      }
   return &NNG{
      server:    false,
      mode:      m,
      url:       url,
      arguments: args,
   }
   return nil
}
func proto(producer bool, m Mode) protocol {
   if producer {
      return protoProducer(m)
   }
   return protoConsumer(m)
}
func (n *NNG) makeNNG(producer bool) (mangos.Socket, error) {
   var sock mangos.Socket
   var err error
   if sock, err = newSocket(proto(producer, n.mode)); err != nil {
      return nil, err
   }
   if err = setSocketOptions(sock, n.arguments...); err != nil {
      sock.Close()
      sock = nil
   }
   if n.server {
      if err = sock.Listen(n.url); err != nil {
         sock.Close()
         sock = nil
      }
   } else {
      if err = sock.Dial(n.url); err != nil {
         sock.Close()
         sock = nil
      }
   }
   return sock, err
}
// maxRecvSize max recv size
@@ -130,8 +148,8 @@
   options[mangos.OptionMaxRecvSize] = maxRecvSize
   options[mangos.OptionWriteQLen] = 0
   options[mangos.OptionReadQLen] = 0
   options[mangos.OptionRecvDeadline] = time.Second
   options[mangos.OptionSendDeadline] = time.Second
   // options[mangos.OptionRecvDeadline] = time.Second
   // options[mangos.OptionSendDeadline] = time.Second
   options[mangos.OptionRaw] = true
   return options
@@ -143,14 +161,15 @@
   switch sock.GetProtocol().Number() {
   case mangos.ProtoSub:
      topic := ""
      for _, arg := range args {
         switch arg.(type) {
         case string:
            options[mangos.OptionSubscribe] = []byte(arg.(string))
            topic = arg.(string)
         default:
            options[mangos.OptionSubscribe] = []byte("")
         }
      }
      options[mangos.OptionSubscribe] = []byte(topic)
   case mangos.ProtoSurveyor:
      for _, arg := range args {
         switch arg.(type) {
@@ -162,10 +181,10 @@
            }
         default:
         }
         options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second
      }
      options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second
   default:
      fmt.Println("no additional args")
   }
   for k, v := range options {