zhangmeng
2019-05-20 26f699593d76d6fa4bcf320c84fd723343c4b5f4
add one pull multi push
2个文件已删除
2个文件已添加
2个文件已修改
481 ■■■■■ 已修改文件
1push-npull.go 106 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 92 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
npush-1pull.go 101 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pushpull.go 90 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
reqrep.go.orig 88 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
shm.go 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
1push-npull.go
New file
@@ -0,0 +1,106 @@
package main
import (
    "demo/deliver"
    "fmt"
    "os"
    "os/signal"
    "time"
    "golang.org/x/sys/unix"
)
func oneSenderImpl(s deliver.Deliver) {
    var err error
    buf := make([]byte, dLen)
    for {
        select {
        case <-ctx.Done():
            return
        default:
            if err = s.Send(buf); err != nil {
                fmt.Printf("can't send message on push socket: %s\n", err.Error())
            } else {
                fmt.Printf("send msg length %d\n", len(buf))
            }
        }
    }
}
func oneSender(url string, m deliver.Mode, args ...interface{}) {
    s := deliver.NewServer(m, url, args...)
    go oneSenderImpl(s)
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
    <-c
    cancel()
    s.Close()
}
func nRecvImpl(c deliver.Deliver, index int) {
    var msg []byte
    var err error
    var t int64
    var elapse int64
    count := 0
    for {
        select {
        case <-ctx.Done():
            return
        default:
            msg, err = c.Recv()
            if err != nil {
                fmt.Println("recv error : ", err)
            }
            if t == 0 {
                t = time.Now().UnixNano()
            }
            elapse = time.Now().UnixNano() - t
            count++
            if elapse > 1e9 {
                fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
                    index, count, len(msg), elapse)
                elapse = 0
                count = 0
                t = 0
            }
        }
    }
}
func nReciever(url string, m deliver.Mode, count int) {
    var cs []deliver.Deliver
    for i := 0; i < count; i++ {
        c := deliver.NewClient(m, url)
        cs = append(cs, c)
        go nRecvImpl(c, i)
    }
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
    <-c
    cancel()
    for _, v := range cs {
        v.Close()
    }
}
main.go
@@ -3,6 +3,7 @@
import (
    "context"
    "demo/deliver"
    "flag"
    "fmt"
    "os"
)
@@ -10,6 +11,58 @@
const dLen = 12 * 1024 * 1024
var ctx, cancel = context.WithCancel(context.Background())
func senderMode(ipc string, m deliver.Mode, count int, one bool) {
    if m == deliver.ReqRep {
        req(ipc, m)
    } else if m == deliver.Shm {
        shmSender(ipc, 2, 32*1024*1024)
    }
    if one {
        oneSender(ipc, m)
    } else {
        nSender(ipc, m, count)
    }
}
func recvMode(ipc string, m deliver.Mode, count int, n bool) {
    if m == deliver.ReqRep {
        rep(ipc, m)
    } else if m == deliver.Shm {
        shmReciever(ipc, count)
    }
    if n {
        nReciever(ipc, m, count)
    } else {
        oneReciever(ipc, m)
    }
}
var (
    proc         string
    procCount    int
    mode         string
    ipc          string
    oneSendnRecv bool
)
const (
    act  = "act"
    pass = "pass"
)
func init() {
    flag.StringVar(&proc, "p", "act", "proc as sender")
    flag.IntVar(&procCount, "c", 1, "proc run count")
    flag.StringVar(&mode, "m", "pushpull", "proc run mode pushpull or pubsub etc.")
    flag.StringVar(&ipc, "i", "ipc:///tmp/pic.ipc", "ipc label")
    flag.BoolVar(&oneSendnRecv, "n", true, "one send n recv")
}
func modeType(t string) deliver.Mode {
@@ -28,39 +81,18 @@
    return deliver.NONE
}
func senderMode(ipc string, m deliver.Mode) {
    if m == deliver.ReqRep {
        req(ipc, m)
    } else if m == deliver.Shm {
        shmSender(ipc, 2, 32*1024*1024)
    }
    sender(ipc, m)
}
func recvMode(ipc string, m deliver.Mode, strCount string) {
    if m == deliver.ReqRep {
        rep(ipc, m)
    } else if m == deliver.Shm {
        shmReciever(ipc, strCount)
    }
    reciever(ipc, m, strCount)
}
func main() {
    if len(os.Args) > 3 && os.Args[1] == "producer" {
        m := modeType(os.Args[2])
        if m > deliver.ModeStart {
            senderMode(os.Args[3], m)
    flag.Parse()
    m := modeType(mode)
    if m > deliver.ModeStart {
        if proc == act {
            senderMode(ipc, m, procCount, oneSendnRecv)
        } else {
            recvMode(ipc, m, procCount, oneSendnRecv)
        }
        os.Exit(0)
    }
    if len(os.Args) > 3 && os.Args[1] == "consumer" {
        m := modeType(os.Args[2])
        if m > deliver.ModeStart {
            recvMode(os.Args[3], m, os.Args[4])
        }
        os.Exit(0)
    }
    fmt.Fprintf(os.Stderr,
        "Usage: pushpull push|pull <URL> <ARG> ...\n")
    os.Exit(1)
npush-1pull.go
New file
@@ -0,0 +1,101 @@
package main
import (
    "demo/deliver"
    "fmt"
    "os"
    "os/signal"
    "time"
    "golang.org/x/sys/unix"
)
func nSenderImpl(s deliver.Deliver, index int) {
    var err error
    buf := make([]byte, dLen)
    for {
        select {
        case <-ctx.Done():
            return
        default:
            if err = s.Send(buf); err != nil {
                fmt.Printf("%d can't send message on push socket: %s\n", index, err.Error())
            } else {
                fmt.Printf("%d send msg length %d\n", index, len(buf))
            }
        }
    }
}
func nSender(url string, m deliver.Mode, count int, args ...interface{}) {
    var cs []deliver.Deliver
    for i := 0; i < count; i++ {
        c := deliver.NewClient(m, url, args...)
        cs = append(cs, c)
        go nSenderImpl(c, i)
    }
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
    <-c
    cancel()
    for _, v := range cs {
        v.Close()
    }
}
func oneRecvImpl(c deliver.Deliver, index int) {
    var msg []byte
    var err error
    var t int64
    var elapse int64
    count := 0
    for {
        msg, err = c.Recv()
        if err != nil {
            fmt.Println("recv error : ", err)
        }
        if t == 0 {
            t = time.Now().UnixNano()
        }
        elapse = time.Now().UnixNano() - t
        count++
        if elapse > 1e9 {
            fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
                index, count, len(msg), elapse)
            elapse = 0
            count = 0
            t = 0
        }
        // time.Sleep(10 * time.Millisecond)
    }
}
func oneReciever(url string, m deliver.Mode) {
    s := deliver.NewServer(m, url)
    go oneRecvImpl(s, 0)
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
    <-c
    cancel()
    s.Close()
}
pushpull.go
File was deleted
reqrep.go.orig
File was deleted
shm.go
@@ -5,7 +5,6 @@
    "fmt"
    "os"
    "os/signal"
    "strconv"
    "time"
    "golang.org/x/sys/unix"
@@ -88,8 +87,7 @@
    }
}
func shmReciever(url string, strCount string) {
    count, _ := strconv.Atoi(strCount)
func shmReciever(url string, count int) {
    var cs []deliver.Deliver
    for i := 0; i < count; i++ {