zhangmeng
2019-09-27 2d390df9ede39c9d7c34bd8190b9329cfc371325
nng.go
@@ -5,6 +5,7 @@
   "fmt"
   "os"
   "strings"
   "time"
   "nanomsg.org/go-mangos"
   "nanomsg.org/go-mangos/protocol/bus"
@@ -23,122 +24,122 @@
// NNG mangos wrap
type NNG struct {
   sock mangos.Socket
   typ  td
   mode Mode
   url  string
   arguments []interface{}
}
// Send impl interface Diliver
func (n *NNG) Send(data []byte) error {
   if n.sock == nil {
   if n == nil {
      return errors.New("please init NNG first")
   }
   if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
      msg := mangos.NewMessage(len(data))
      msg.Body = data
      return n.sock.SendMsg(msg)
   var err error
   if n.sock == nil {
      n.sock, err = n.makeNNG(agent)
      if err != nil {
         fmt.Println("create nng sender error")
         return err
      }
   }
   return n.sock.Send(data)
   if surveyorTime > 0 {
      time.Sleep(time.Duration(surveyorTime*2) * time.Second)
   }
   msg := mangos.NewMessage(1)
   msg.Body = data
   return n.sock.SendMsg(msg)
}
// Recv impl interface Diliver
func (n *NNG) Recv() ([]byte, error) {
   if n.sock == nil {
   if n == nil {
      return nil, errors.New("please init NNG first")
   }
   if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
      msg, err := n.sock.RecvMsg()
      return msg.Body, err
   var err error
   if n.sock == nil {
      n.sock, err = n.makeNNG(coactee)
      if err != nil {
         fmt.Println("create nng reciever error")
         return nil, err
      }
   }
   return n.sock.Recv()
   var msg *mangos.Message
   if msg, err = n.sock.RecvMsg(); err != nil {
      return nil, err
   }
   return msg.Body, nil
}
// nngProducer create from deliver Mode
func nngProducer(m Mode, url string, args ...interface{}) *NNG {
// Recv2 impl interface
func (n *NNG) Recv2(data []byte) (l int, err error) {
   data, err = n.Recv()
   l = len(data)
   return l, err
}
// Close impl interface Deliver
func (n *NNG) Close() {
   if n != nil && n.sock != nil {
      n.sock.Close()
      n.sock = nil
   }
}
func nngServer(m Mode, url string, args ...interface{}) *NNG {
   rmExistedIpcName(url)
   if sock, err := newSocket(protoProducer(m)); err == nil {
      if err = setSocketOptions(sock, args); err != nil {
         return nil
      }
      if err = sock.Listen(url); err != nil {
         sock.Close()
         return nil
      }
      return &NNG{
         sock,
      }
   }
   return nil
   return &NNG{
      typ:       agent,
      mode:      m,
      url:       url,
      arguments: args,
   }
}
// nngConsumer create from deliver Mode
func nngConsumer(m Mode, url string, args ...interface{}) *NNG {
func nngClient(m Mode, url string, args ...interface{}) *NNG {
   if sock, err := newSocket(protoConsumer(m)); err == nil {
      if err = setSocketOptions(sock, args); err != nil {
         return nil
      }
      if err = sock.Dial(url); err != nil {
         sock.Close()
         return nil
      }
      return &NNG{
         sock,
      }
   return &NNG{
      typ:       coactee,
      mode:      m,
      url:       url,
      arguments: args,
   }
   return nil
}
// MaxRecvSize max recv size
var MaxRecvSize = 33 * 1024 * 1024
func proto(typ td, m Mode) protocol {
   if typ == agent {
      return protoAgent(m)
   } else if typ == coactee {
      return protoCoactee(m)
   }
   return NONE
}
func defualtSocketOptions(sock mangos.Socket) error {
func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
   var sock mangos.Socket
   var err error
   if err = sock.SetOption(mangos.OptionMaxRecvSize, MaxRecvSize); err != nil {
      sock.Close()
      return err
   }
   if err = sock.SetOption(mangos.OptionWriteQLen, 0); err != nil {
      sock.Close()
      return err
   }
   if err = sock.SetOption(mangos.OptionReadQLen, 0); err != nil {
      sock.Close()
      return err
   }
   // if err = sock.SetOption(mangos.OptionNoDelay, true); err != nil {
   //    sock.Close()
   //    return err
   // }
   if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
      sock.Close()
      return err
   switch n.mode {
   case Bus:
      sock, err = n.busMakeNNG(typ)
   case ReqRep, SurvResp:
      sock, err = n.rrMakeNNG(typ)
   default:
      sock, err = n.ppMakeNNG(typ)
   }
   return nil
}
func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
   err := defualtSocketOptions(sock)
   if err != nil {
      return err
   }
   if sock.GetProtocol().Number() == mangos.ProtoSub {
      for _, arg := range args {
         switch arg.(type) {
         case string:
            err = sock.SetOption(mangos.OptionSubscribe, []byte(arg.(string)))
         default:
            err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
         }
      }
   }
   return nil
   return sock, err
}
func rmExistedIpcName(url string) {
@@ -146,6 +147,8 @@
   if s[0] == "ipc" {
      if _, err := os.Stat(s[1]); err == nil {
         os.Remove(s[1])
      } else if !os.IsNotExist(err) {
         os.Remove(s[1])
      }
   }
@@ -155,6 +158,9 @@
// access the underlying library.
func newSocket(p protocol) (mangos.Socket, error) {
   if p == NONE {
      return nil, errors.New("new socket protocol none")
   }
   var s mangos.Socket
   var err error
@@ -188,8 +194,6 @@
   }
   all.AddTransports(s)
   // s.AddTransport(ipc.NewTransport())
   // s.AddTransport(tcp.NewTransport())
   return s, nil
}
@@ -205,6 +209,7 @@
// Constants for protocols.
const (
   NONE       = -1
   PUSH       = protocol(mangos.ProtoPush)
   PULL       = protocol(mangos.ProtoPull)
   PUB        = protocol(mangos.ProtoPub)
@@ -217,38 +222,38 @@
   PAIR       = protocol(mangos.ProtoPair)
)
func protoProducer(m Mode) protocol {
func protoAgent(m Mode) protocol {
   switch m {
   case PushPull:
      return PUSH
   case PubSub:
      return PUB
   case ReqRep:
      return REP
   case SurvResp:
      return SURVEYOR
   case Bus:
      return BUS
   case Pair:
      return PAIR
   case SurvResp:
      return SURVEYOR
   case ReqRep:
      return REQ
   case Bus:
      return BUS
   }
   return PUSH
   return NONE
}
func protoConsumer(m Mode) protocol {
func protoCoactee(m Mode) protocol {
   switch m {
   case PushPull:
      return PULL
   case PubSub:
      return SUB
   case ReqRep:
      return REQ
   case SurvResp:
      return RESPONDENT
   case Bus:
      return BUS
   case Pair:
      return PAIR
   case SurvResp:
      return RESPONDENT
   case ReqRep:
      return REP
   case Bus:
      return BUS
   }
   return PULL
   return NONE
}