zhangmeng
2019-05-15 9e1301abf98fb4e04fabba535c1bb5ad161a66a4
add surveyor
1个文件已修改
30 ■■■■ 已修改文件
nng.go 30 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nng.go
@@ -5,6 +5,7 @@
    "fmt"
    "os"
    "strings"
    "time"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/bus"
@@ -31,11 +32,17 @@
        return errors.New("please init NNG first")
    }
    switch n.sock.GetProtocol().Number() {
    case mangos.ProtoSurveyor:
        time.Sleep(surveyorTime * 2)
    default:
    }
    if _, err := n.sock.GetOption(mangos.OptionRaw); err == nil {
        msg := mangos.NewMessage(len(data))
        msg.Body = data
        return n.sock.SendMsg(msg)
    }
    return n.sock.Send(data)
}
@@ -92,12 +99,15 @@
    return nil
}
// MaxRecvSize max recv size
var MaxRecvSize = 33 * 1024 * 1024
// maxRecvSize max recv size
var (
    maxRecvSize  = 33 * 1024 * 1024
    surveyorTime = time.Second / 2
)
func defualtSocketOptions(sock mangos.Socket) error {
    var err error
    if err = sock.SetOption(mangos.OptionMaxRecvSize, MaxRecvSize); err != nil {
    if err = sock.SetOption(mangos.OptionMaxRecvSize, maxRecvSize); err != nil {
        sock.Close()
        return err
    }
@@ -127,7 +137,8 @@
    if err != nil {
        return err
    }
    if sock.GetProtocol().Number() == mangos.ProtoSub {
    switch sock.GetProtocol().Number() {
    case mangos.ProtoSub:
        for _, arg := range args {
            switch arg.(type) {
            case string:
@@ -136,6 +147,17 @@
                err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
            }
        }
    case mangos.ProtoSurveyor:
        for _, arg := range args {
            switch arg.(type) {
            case int:
                surveyorTime = time.Duration(arg.(int)/2) * time.Second
            default:
            }
            err = sock.SetOption(mangos.OptionSurveyTime, surveyorTime)
        }
    default:
        fmt.Println("no additional args")
    }
    return nil