fix
zhangmeng
2019-05-16 5ff1f32410e0697e581f6c389c8c22c5abd474c0
nng.go
@@ -5,6 +5,7 @@
   "fmt"
   "os"
   "strings"
   "time"
   "nanomsg.org/go-mangos"
   "nanomsg.org/go-mangos/protocol/bus"
@@ -23,6 +24,7 @@
// NNG mangos wrap
type NNG struct {
   sock mangos.Socket
   raw  bool
}
// Send impl interface Diliver
@@ -31,11 +33,17 @@
      return errors.New("please init NNG first")
   }
   if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
   // switch n.sock.GetProtocol().Number() {
   // case mangos.ProtoSurveyor:
   //    time.Sleep(surveyorTime * 2)
   // default:
   // }
   if n.raw {
      msg := mangos.NewMessage(len(data))
      msg.Body = data
      return n.sock.SendMsg(msg)
   }
   return n.sock.Send(data)
}
@@ -44,11 +52,22 @@
   if n.sock == nil {
      return nil, errors.New("please init NNG first")
   }
   if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
      msg, err := n.sock.RecvMsg()
      return msg.Body, err
   if n.raw {
      var msg *mangos.Message
      var err error
      if msg, err = n.sock.RecvMsg(); err != nil {
         return nil, err
      }
      return msg.Body, nil
   }
   return n.sock.Recv()
}
// Close impl interface Deliver
func (n *NNG) Close() {
   if n.sock != nil {
      n.sock.Close()
   }
}
// nngProducer create from deliver Mode
@@ -65,6 +84,7 @@
      }
      return &NNG{
         sock,
         true,
      }
   }
@@ -86,18 +106,22 @@
      return &NNG{
         sock,
         true,
      }
   }
   return nil
}
// MaxRecvSize max recv size
var MaxRecvSize = 33 * 1024 * 1024
// maxRecvSize max recv size
var (
   maxRecvSize  = 33 * 1024 * 1024
   surveyorTime = time.Second / 2
)
func defualtSocketOptions(sock mangos.Socket) error {
   var err error
   if err = sock.SetOption(mangos.OptionMaxRecvSize, MaxRecvSize); err != nil {
   if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil {
      sock.Close()
      return err
   }
@@ -109,10 +133,14 @@
      sock.Close()
      return err
   }
   // if err = sock.SetOption(mangos.OptionNoDelay, true); err != nil {
   //    sock.Close()
   //    return err
   // }
   if err = sock.SetOption(mangos.OptionRecvDeadline, time.Second); err != nil {
      sock.Close()
      return err
   }
   if err = sock.SetOption(mangos.OptionSendDeadline, time.Second); err != nil {
      sock.Close()
      return err
   }
   if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
      sock.Close()
      return err
@@ -127,7 +155,8 @@
   if err != nil {
      return err
   }
   if sock.GetProtocol().Number() == mangos.ProtoSub {
   switch sock.GetProtocol().Number() {
   case mangos.ProtoSub:
      for _, arg := range args {
         switch arg.(type) {
         case string:
@@ -136,6 +165,17 @@
            err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
         }
      }
   case mangos.ProtoSurveyor:
      for _, arg := range args {
         switch arg.(type) {
         case int:
            surveyorTime = time.Duration(arg.(int)/2) * time.Second
         default:
         }
         err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime)
      }
   default:
      fmt.Println("no additional args")
   }
   return nil