zhangmeng
2019-05-15 9e1301abf98fb4e04fabba535c1bb5ad161a66a4
nng.go
@@ -5,6 +5,7 @@
   "fmt"
   "os"
   "strings"
   "time"
   "nanomsg.org/go-mangos"
   "nanomsg.org/go-mangos/protocol/bus"
@@ -31,11 +32,17 @@
      return errors.New("please init NNG first")
   }
   switch n.sock.GetProtocol().Number() {
   case mangos.ProtoSurveyor:
      time.Sleep(surveyorTime * 2)
   default:
   }
   if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
      msg := mangos.NewMessage(len(data))
      msg.Body = data
      return n.sock.SendMsg(msg)
   }
   return n.sock.Send(data)
}
@@ -92,12 +99,15 @@
   return nil
}
// MaxRecvSize max recv size
var MaxRecvSize = 33 * 1024 * 1024
// maxRecvSize max recv size
var (
   maxRecvSize  = 33 * 1024 * 1024
   surveyorTime = time.Second / 2
)
func defualtSocketOptions(sock mangos.Socket) error {
   var err error
   if err = sock.SetOption(mangos.OptionMaxRecvSize, MaxRecvSize); err != nil {
   if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil {
      sock.Close()
      return err
   }
@@ -127,7 +137,8 @@
   if err != nil {
      return err
   }
   if sock.GetProtocol().Number() == mangos.ProtoSub {
   switch sock.GetProtocol().Number() {
   case mangos.ProtoSub:
      for _, arg := range args {
         switch arg.(type) {
         case string:
@@ -136,6 +147,17 @@
            err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
         }
      }
   case mangos.ProtoSurveyor:
      for _, arg := range args {
         switch arg.(type) {
         case int:
            surveyorTime = time.Duration(arg.(int)/2) * time.Second
         default:
         }
         err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime)
      }
   default:
      fmt.Println("no additional args")
   }
   return nil