zhangmeng
2019-05-17 d23f54e337d12fb4e6d5a0a5e1f041a51005e10c
nng.go
@@ -1,6 +1,7 @@
package deliver
import (
   "errors"
   "fmt"
   "os"
   "strings"
@@ -24,9 +25,8 @@
type td int
const (
   producer = td(iota)
   consumer
   star //mangos bus protocol
   agent = td(iota)
   coactee
)
// NNG mangos wrap
@@ -43,9 +43,9 @@
func (n *NNG) Send(data []byte) error {
   var err error
   if n.sock == nil {
      n.sock, err = n.makeNNG(producer)
      n.sock, err = n.makeNNG(agent)
      if err != nil {
         fmt.Println("create nng producer error")
         fmt.Println("create nng sender error")
         return err
      }
   }
@@ -65,9 +65,9 @@
   var err error
   if n.sock == nil {
      n.sock, err = n.makeNNG(consumer)
      n.sock, err = n.makeNNG(coactee)
      if err != nil {
         fmt.Println("create nng consumer error")
         fmt.Println("create nng reciever error")
         return nil, err
      }
   }
@@ -112,98 +112,29 @@
}
func proto(typ td, m Mode) protocol {
   if typ == producer {
      return protoProducer(m)
   } else if typ == consumer {
      return protoConsumer(m)
   if typ == agent {
      return protoAgent(m)
   } else if typ == coactee {
      return protoCoactee(m)
   }
   return protoConsumer(m)
   return NONE
}
func (n *NNG) makeNNG(typ td) (mangos.Socket, error) {
   var sock mangos.Socket
   var err error
   if sock, err = newSocket(proto(typ, n.mode)); err != nil {
      return nil, err
   }
   if err = setSocketOptions(sock, n.arguments...); err != nil {
      sock.Close()
      sock = nil
   }
   if n.server {
      if err = sock.Listen(n.url); err != nil {
         sock.Close()
         sock = nil
      }
   } else {
      if err = sock.Dial(n.url); err != nil {
         sock.Close()
         sock = nil
      }
   }
   return sock, err
}
// maxRecvSize max recv size
var (
   maxRecvSize  = 33 * 1024 * 1024
   surveyorTime = -1
)
func defualtSocketOptions() map[string]interface{} {
   options := make(map[string]interface{})
   options[mangos.OptionMaxRecvSize] = maxRecvSize
   options[mangos.OptionWriteQLen] = 0
   options[mangos.OptionReadQLen] = 0
   options[mangos.OptionRecvDeadline] = time.Second
   options[mangos.OptionSendDeadline] = time.Second
   options[mangos.OptionRaw] = true
   return options
}
func setSocketOptions(sock mangos.Socket, args ...interface{}) error {
   options := defualtSocketOptions()
   switch sock.GetProtocol().Number() {
   case mangos.ProtoSub:
      topic := ""
      for _, arg := range args {
         switch arg.(type) {
         case string:
            topic = arg.(string)
         default:
         }
      }
      options[mangos.OptionSubscribe] = []byte(topic)
   case mangos.ProtoSurveyor:
      for _, arg := range args {
         switch arg.(type) {
         case int:
            if arg.(int) < 2 {
               surveyorTime = 1
            } else {
               surveyorTime = arg.(int) / 2
            }
         default:
         }
      }
      options[mangos.OptionSurveyTime] = time.Duration(surveyorTime) * time.Second
   switch n.mode {
   case Bus:
      sock, err = n.busMakeNNG(typ)
   case ReqRep:
      sock, err = n.rrMakeNNG(typ)
   default:
      sock, err = n.ppMakeNNG(typ)
   }
   for k, v := range options {
      if err := sock.SetOption(k, v); err != nil {
         return err
      }
   }
   return nil
   return sock, err
}
func rmExistedIpcName(url string) {
@@ -220,6 +151,9 @@
// access the underlying library.
func newSocket(p protocol) (mangos.Socket, error) {
   if p == NONE {
      return nil, errors.New("new socket protocol none")
   }
   var s mangos.Socket
   var err error
@@ -268,6 +202,7 @@
// Constants for protocols.
const (
   NONE       = -1
   PUSH       = protocol(mangos.ProtoPush)
   PULL       = protocol(mangos.ProtoPull)
   PUB        = protocol(mangos.ProtoPub)
@@ -280,38 +215,38 @@
   PAIR       = protocol(mangos.ProtoPair)
)
func protoProducer(m Mode) protocol {
func protoAgent(m Mode) protocol {
   switch m {
   case PushPull:
      return PUSH
   case PubSub:
      return PUB
   case ReqRep:
      return REP
   case SurvResp:
      return SURVEYOR
   case Bus:
      return BUS
   case Pair:
      return PAIR
   case SurvResp:
      return SURVEYOR
   case ReqRep:
      return REQ
   case Bus:
      return BUS
   }
   return PUSH
   return NONE
}
func protoConsumer(m Mode) protocol {
func protoCoactee(m Mode) protocol {
   switch m {
   case PushPull:
      return PULL
   case PubSub:
      return SUB
   case ReqRep:
      return REQ
   case SurvResp:
      return RESPONDENT
   case Bus:
      return BUS
   case Pair:
      return PAIR
   case SurvResp:
      return RESPONDENT
   case ReqRep:
      return REP
   case Bus:
      return BUS
   }
   return PULL
   return NONE
}