| | |
| | | "fmt" |
| | | "os" |
| | | "strings" |
| | | "time" |
| | | |
| | | "nanomsg.org/go-mangos" |
| | | "nanomsg.org/go-mangos/protocol/bus" |
| | |
| | | return errors.New("please init NNG first") |
| | | } |
| | | |
| | | switch n.sock.GetProtocol().Number() { |
| | | case mangos.ProtoSurveyor: |
| | | time.Sleep(surveyorTime * 2) |
| | | default: |
| | | } |
| | | if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil { |
| | | msg := mangos.NewMessage(len(data)) |
| | | msg.Body = data |
| | | return n.sock.SendMsg(msg) |
| | | } |
| | | |
| | | return n.sock.Send(data) |
| | | } |
| | | |
| | |
| | | return n.sock.Recv() |
| | | } |
| | | |
| | | // NewNNGProducer create from deliver Mode |
| | | func NewNNGProducer(m Mode, url string, args ...interface{}) *NNG { |
| | | // Close impl interface Deliver |
| | | func (n *NNG) Close() { |
| | | if n.sock != nil { |
| | | n.sock.Close() |
| | | } |
| | | } |
| | | |
| | | // nngProducer create from deliver Mode |
| | | func nngProducer(m Mode, url string, args ...interface{}) *NNG { |
| | | |
| | | rmExistedIpcName(url) |
| | | if sock, err := newSocket(protoProducer(m)); err == nil { |
| | |
| | | return nil |
| | | } |
| | | |
| | | // NewNNGConsumer create from deliver Mode |
| | | func NewNNGConsumer(m Mode, url string, args ...interface{}) *NNG { |
| | | // nngConsumer create from deliver Mode |
| | | func nngConsumer(m Mode, url string, args ...interface{}) *NNG { |
| | | |
| | | if sock, err := newSocket(protoConsumer(m)); err == nil { |
| | | if err = setSocketOptions(sock, args); err != nil { |
| | |
| | | return nil |
| | | } |
| | | |
| | | // MaxRecvSize max recv size |
| | | var MaxRecvSize = 33 * 1024 * 1024 |
| | | // maxRecvSize max recv size |
| | | var ( |
| | | maxRecvSize = 33 * 1024 * 1024 |
| | | surveyorTime = time.Second / 2 |
| | | ) |
| | | |
| | | func defualtSocketOptions(sock mangos.Socket) error { |
| | | var err error |
| | | if err = sock.SetOption(mangos.OptionMaxRecvSize, MaxRecvSize); err != nil { |
| | | if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil { |
| | | sock.Close() |
| | | return err |
| | | } |
| | |
| | | sock.Close() |
| | | return err |
| | | } |
| | | // if err = sock.SetOption(mangos.OptionNoDelay, true); err != nil { |
| | | // if err = sock.SetOption(mangos.OptionRecvDeadline, time.Second); err != nil { |
| | | // sock.Close() |
| | | // return err |
| | | // } |
| | | // if err = sock.SetOption(mangos.OptionSendDeadline, time.Second); err != nil { |
| | | // sock.Close() |
| | | // return err |
| | | // } |
| | |
| | | if err != nil { |
| | | return err |
| | | } |
| | | if sock.GetProtocol().Number() == mangos.ProtoSub { |
| | | switch sock.GetProtocol().Number() { |
| | | case mangos.ProtoSub: |
| | | for _, arg := range args { |
| | | switch arg.(type) { |
| | | case string: |
| | |
| | | err = sock.SetOption(mangos.OptionSubscribe, []byte("")) |
| | | } |
| | | } |
| | | case mangos.ProtoSurveyor: |
| | | for _, arg := range args { |
| | | switch arg.(type) { |
| | | case int: |
| | | surveyorTime = time.Duration(arg.(int)/2) * time.Second |
| | | default: |
| | | } |
| | | err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime) |
| | | } |
| | | default: |
| | | fmt.Println("no additional args") |
| | | } |
| | | |
| | | return nil |