zhangmeng
2019-05-16 162fe98d7728445b72528283e1bfdddc432d2676
nng.go
@@ -24,6 +24,7 @@
// NNG mangos wrap
type NNG struct {
   sock mangos.Socket
   raw  bool
}
// Send impl interface Diliver
@@ -32,12 +33,12 @@
      return errors.New("please init NNG first")
   }
   switch n.sock.GetProtocol().Number() {
   case mangos.ProtoSurveyor:
      time.Sleep(surveyorTime * 2)
   default:
   }
   if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
   // switch n.sock.GetProtocol().Number() {
   // case mangos.ProtoSurveyor:
   //    time.Sleep(surveyorTime * 2)
   // default:
   // }
   if n.raw {
      msg := mangos.NewMessage(len(data))
      msg.Body = data
      return n.sock.SendMsg(msg)
@@ -51,9 +52,13 @@
   if n.sock == nil {
      return nil, errors.New("please init NNG first")
   }
   if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
      msg, err := n.sock.RecvMsg()
      return msg.Body, err
   if n.raw {
      var msg *mangos.Message
      var err error
      if msg, err = n.sock.RecvMsg(); err != nil {
         return nil, err
      }
      return msg.Body, nil
   }
   return n.sock.Recv()
}
@@ -62,44 +67,51 @@
func (n *NNG) Close() {
   if n.sock != nil {
      n.sock.Close()
      n.sock = nil
   }
}
// nngProducer create from deliver Mode
func nngProducer(m Mode, url string, args ...interface{}) *NNG {
func nngListener(m Mode, url string, args ...interface{}) *NNG {
   rmExistedIpcName(url)
   if sock, err := newSocket(protoProducer(m)); err == nil {
      if err = setSocketOptions(sock, args); err != nil {
         sock.Close()
         sock = nil
         return nil
      }
      if err = sock.Listen(url); err != nil {
         sock.Close()
         sock = nil
         return nil
      }
      return &NNG{
         sock,
         true,
      }
   }
   return nil
}
// nngConsumer create from deliver Mode
func nngConsumer(m Mode, url string, args ...interface{}) *NNG {
func nngDialer(m Mode, url string, args ...interface{}) *NNG {
   if sock, err := newSocket(protoConsumer(m)); err == nil {
      if err = setSocketOptions(sock, args); err != nil {
         sock.Close()
         sock = nil
         return nil
      }
      if err = sock.Dial(url); err != nil {
         sock.Close()
         sock = nil
         return nil
      }
      return &NNG{
         sock,
         true,
      }
   }
@@ -112,50 +124,32 @@
   surveyorTime = time.Second / 2
)
func defualtSocketOptions(sock mangos.Socket) error {
   var err error
   if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil {
      sock.Close()
      return err
   }
   if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil {
      sock.Close()
      return err
   }
   if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil {
      sock.Close()
      return err
   }
   // if err = sock.SetOption(mangos.OptionRecvDeadline, time.Second); err != nil {
   //    sock.Close()
   //    return err
   // }
   // if err = sock.SetOption(mangos.OptionSendDeadline, time.Second); err != nil {
   //    sock.Close()
   //    return err
   // }
   if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
      sock.Close()
      return err
   }
func defualtSocketOptions() map[string]interface{} {
   return nil
   options := make(map[string]interface{})
   options[mangos.OptionMaxRecvSize] = maxRecvSize
   options[mangos.OptionWriteQLen] = 0
   options[mangos.OptionReadQLen] = 0
   options[mangos.OptionRecvDeadline] = time.Second
   options[mangos.OptionSendDeadline] = time.Second
   options[mangos.OptionRaw] = true
   return options
}
func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
   err := defualtSocketOptions(sock)
   if err != nil {
      return err
   }
   options := defualtSocketOptions()
   switch sock.GetProtocol().Number() {
   case mangos.ProtoSub:
      for _, arg := range args {
         switch arg.(type) {
         case string:
            err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string)))
            options[mangos.OptionSubscribe] = []byte(arg.(string))
         default:
            err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
            options[mangos.OptionSubscribe] = []byte("")
         }
      }
   case mangos.ProtoSurveyor:
@@ -165,12 +159,17 @@
            surveyorTime = time.Duration(arg.(int)/2) * time.Second
         default:
         }
         err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime)
         options[mangos.OptionSurveyTime] = surveyorTime
      }
   default:
      fmt.Println("no additional args")
   }
   for k, v := range options {
      if err := sock.SetOption(k, v); err != nil {
         return err
      }
   }
   return nil
}
@@ -221,8 +220,6 @@
   }
   all.AddTransports(s)
   // s.AddTransport(ipc.NewTransport())
   // s.AddTransport(tcp.NewTransport())
   return s, nil
}