zhangmeng
2019-05-17 22125ca10867152617cc4f42f403a0f6e37648a4
add reqrep
3个文件已添加
2个文件已修改
376 ■■■■ 已修改文件
deliver @ d23f54 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 124 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pushpull.go 90 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
reqrep.go 72 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
reqrep.go.orig 88 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
deliver
@@ -1 +1 @@
Subproject commit 306588f52747268250997a9255ef19583bbd615c
Subproject commit d23f54e337d12fb4e6d5a0a5e1f041a51005e10c
main.go
@@ -4,114 +4,52 @@
    "demo/deliver"
    "fmt"
    "os"
    "os/signal"
    "strconv"
    "time"
    "golang.org/x/sys/unix"
)
const dLen = 12 * 1024 * 1024
var mode = deliver.PushPull
func modeType(t string) deliver.Mode {
func senderImpl(s deliver.Deliver) {
    var err error
    buf := make([]byte, dLen)
    for {
        if err = s.Send(buf); err != nil {
            fmt.Printf("can't send message on push socket: %s\n", err.Error())
        } else {
            fmt.Printf("send msg length %d\n", len(buf))
        }
        // time.Sleep(10 * time.Millisecond)
    }
}
func sender(url string, args ...interface{}) {
    s := deliver.NewServer(deliver.Mode(mode), url, args...)
    go senderImpl(s)
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
    <-c
    s.Close()
}
func recvImpl(url string, index int) {
    c := deliver.NewClient(deliver.Mode(mode), url)
    var msg []byte
    var err error
    var t int64
    var elapse int64
    count := 0
    for {
        msg, err = c.Recv()
        if err != nil {
            fmt.Println("recv error : ", err)
        }
        if t == 0 {
            t = time.Now().UnixNano()
        }
        elapse = time.Now().UnixNano() - t
        count++
        if elapse > 1e9 {
            fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
                index, count, len(msg), elapse)
            elapse = 0
            count = 0
            t = 0
        }
        // time.Sleep(10 * time.Millisecond)
    }
}
func reciever(url string, strCount string) {
    count, _ := strconv.Atoi(strCount)
    for i := 0; i < count; i++ {
        go recvImpl(url, i)
    }
    for {
        time.Sleep(2 * time.Second)
    }
}
func modeType(t string) {
    if t == "pushpull" {
        mode = deliver.PushPull
        return deliver.PushPull
    } else if t == "pubsub" {
        mode = deliver.PubSub
        return deliver.PubSub
    } else if t == "pair" {
        mode = deliver.Pair
        return deliver.Pair
    } else if t == "reqrep" {
        return deliver.ReqRep
    }
    return deliver.Mode(-1)
}
func senderMode(ipc string, m deliver.Mode) {
    if m == deliver.ReqRep {
        req(ipc, m)
    }
    sender(ipc, m)
}
func recvMode(ipc string, m deliver.Mode, strCount string) {
    if m == deliver.ReqRep {
        rep(ipc, m)
    }
    reciever(ipc, m, strCount)
}
func main() {
    if len(os.Args) > 3 && os.Args[1] == "producer" {
        modeType(os.Args[2])
        sender(os.Args[3])
        m := modeType(os.Args[2])
        if m > deliver.ModeStart {
            senderMode(os.Args[3], m)
        }
        os.Exit(0)
    }
    if len(os.Args) > 3 && os.Args[1] == "consumer" {
        modeType(os.Args[2])
        reciever(os.Args[3], os.Args[4])
        m := modeType(os.Args[2])
        if m > deliver.ModeStart {
            recvMode(os.Args[3], m, os.Args[4])
        }
        os.Exit(0)
    }
    fmt.Fprintf(os.Stderr,
pushpull.go
New file
@@ -0,0 +1,90 @@
package main
import (
    "demo/deliver"
    "fmt"
    "os"
    "os/signal"
    "strconv"
    "time"
    "golang.org/x/sys/unix"
)
func senderImpl(s deliver.Deliver) {
    var err error
    buf := make([]byte, dLen)
    for {
        if err = s.Send(buf); err != nil {
            fmt.Printf("can't send message on push socket: %s\n", err.Error())
        } else {
            fmt.Printf("send msg length %d\n", len(buf))
        }
        // time.Sleep(10 * time.Millisecond)
    }
}
func sender(url string, m deliver.Mode, args ...interface{}) {
    s := deliver.NewServer(m, url, args...)
    go senderImpl(s)
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
    <-c
    s.Close()
}
func recvImpl(url string, m deliver.Mode, index int) {
    c := deliver.NewClient(m, url)
    var msg []byte
    var err error
    var t int64
    var elapse int64
    count := 0
    for {
        msg, err = c.Recv()
        if err != nil {
            fmt.Println("recv error : ", err)
        }
        if t == 0 {
            t = time.Now().UnixNano()
        }
        elapse = time.Now().UnixNano() - t
        count++
        if elapse > 1e9 {
            fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
                index, count, len(msg), elapse)
            elapse = 0
            count = 0
            t = 0
        }
        // time.Sleep(10 * time.Millisecond)
    }
}
func reciever(url string, m deliver.Mode, strCount string) {
    count, _ := strconv.Atoi(strCount)
    for i := 0; i < count; i++ {
        go recvImpl(url, m, i)
    }
    for {
        time.Sleep(2 * time.Second)
    }
}
reqrep.go
New file
@@ -0,0 +1,72 @@
package main
import (
    "demo/deliver"
    "fmt"
    "time"
)
func req(url string, m deliver.Mode) {
    p := deliver.NewClient(m, url)
    var err error
    msg := `hello, give me your data`
    var t int64
    var elapse int64
    count := 0
    for {
        if err = p.Send([]byte(msg)); err != nil {
            fmt.Printf("can't send message on push socket: %s\n", err.Error())
        } else {
            fmt.Printf("send msg length %d\n", len(msg))
        }
        if buf, err := p.Recv(); err != nil {
            fmt.Println("recv error: ", err)
        } else {
            if t == 0 {
                t = time.Now().UnixNano()
            }
            elapse = time.Now().UnixNano() - t
            count++
            if elapse > 1e9 {
                fmt.Printf("NODE: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
                    count, len(buf), elapse)
                elapse = 0
                count = 0
                t = 0
            }
        }
        // time.Sleep(10 * time.Millisecond)
    }
}
func rep(url string, m deliver.Mode) {
    c := deliver.NewServer(m, url)
    var msg []byte
    var err error
    buf := make([]byte, dLen)
    for {
        msg, err = c.Recv()
        if err != nil {
            fmt.Println("recv error : ", err, " msg ", msg)
            continue
        }
        fmt.Println("recv msg: ", string(msg))
        c.Send(buf)
        // time.Sleep(10 * time.Millisecond)
    }
}
reqrep.go.orig
New file
@@ -0,0 +1,88 @@
package main
import (
    "demo/deliver"
    "fmt"
    "os"
    "time"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/rep"
    "nanomsg.org/go-mangos/protocol/req"
    "nanomsg.org/go-mangos/transport/ipc"
    "nanomsg.org/go-mangos/transport/tcp"
)
func fReq(url string, m deliver.Mode) {
    var sock mangos.Socket
    var err error
    if sock, err = req.NewSocket(); err != nil {
        die("can't get new req socket: %s", err.Error())
    }
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    sock.SetOption(mangos.OptionRaw, true)
    if err = sock.Listen(url); err != nil {
        die("can't dial on req socket: %s", err.Error())
    }
    fmt.Printf("NODE1: SENDING DATE REQUEST %s\n", "DATE")
    data := []byte("DATE")
    message := mangos.NewMessage(len(data))
    message.Body = data
    if err = sock.SendMsg(message); err != nil {
        die("can't send message on push socket: %s", err.Error())
    }
    var mesg *mangos.Message
    if mesg, err = sock.RecvMsg(); err != nil {
        die("can't receive date: %s", err.Error())
    }
    fmt.Printf("NODE1: RECEIVED DATE %s\n", string(mesg.Body))
    sock.Close()
}
func fRep(url string, m deliver.Mode) {
    var sock mangos.Socket
    var err error
    var msg *mangos.Message
    if sock, err = rep.NewSocket(); err != nil {
        die("can't get new rep socket: %s", err)
    }
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    sock.SetOption(mangos.OptionRaw, true)
    if err = sock.Dial(url); err != nil {
        die("can't listen on rep socket: %s", err.Error())
    }
    for {
        // Could also use sock.RecvMsg to get header
        msg, err = sock.RecvMsg()
        if string(msg.Body) == "DATE" { // no need to terminate
            fmt.Println("NODE0: RECEIVED DATE REQUEST")
            d := date()
            fmt.Printf("NODE0: SENDING DATE %s\n", d)
            data := []byte(d)
            message := mangos.NewMessage(len(data))
            message.Body = data
            err = sock.SendMsg(message)
            if err != nil {
                die("can't send reply: %s", err.Error())
            }
        }
    }
}
func die(format string, v ...interface{}) {
    fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
    os.Exit(1)
}
func date() string {
    return time.Now().Format(time.ANSIC)
}