zhangmeng
2019-05-20 0f27a87065cd0446e9a491357d440dc67e36ea0d
add shm multi send one recv
1个文件已添加
2个文件已修改
127 ■■■■■ 已修改文件
main.go 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
shm.go 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
shm2.go 108 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go
@@ -16,7 +16,8 @@
    if m == deliver.ReqRep {
        req(ipc, m)
    } else if m == deliver.Shm {
        shmSender(ipc, 2, 32*1024*1024)
        // shmSender(ipc, 2, 32*1024*1024)
        shmReciever2(ipc, count, 2, 32*1024*1024)
    }
    if one {
@@ -30,7 +31,8 @@
    if m == deliver.ReqRep {
        rep(ipc, m)
    } else if m == deliver.Shm {
        shmReciever(ipc, count)
        // shmReciever(ipc, count)
        shmSender2(ipc, count)
    }
    if n {
shm.go
@@ -5,6 +5,7 @@
    "fmt"
    "os"
    "os/signal"
    "sync"
    "time"
    "golang.org/x/sys/unix"
@@ -42,14 +43,14 @@
    go shmSenderImpl(s)
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
    signal.Notify(c, os.Interrupt, os.Kill, unix.SIGINT)
    <-c
    cancel()
    s.Close()
}
func shmRecvImpl(c deliver.Deliver, url string, index int) {
func shmRecvImpl(wg *sync.WaitGroup, c deliver.Deliver, url string, index int) {
    var msg []byte
    var err error
@@ -61,6 +62,7 @@
    for {
        select {
        case <-ctx.Done():
            wg.Done()
            return
        default:
            msg, err = c.Recv()
@@ -85,15 +87,19 @@
        // time.Sleep(10 * time.Millisecond)
    }
}
func shmReciever(url string, count int) {
    wg := sync.WaitGroup{}
    var cs []deliver.Deliver
    for i := 0; i < count; i++ {
        wg.Add(1)
        c := deliver.NewClient(deliver.Shm, url)
        cs = append(cs, c)
        go shmRecvImpl(c, url, i)
        go shmRecvImpl(&wg, c, url, i)
    }
    c := make(chan os.Signal, 1)
@@ -101,6 +107,7 @@
    <-c
    cancel()
    wg.Wait()
    for _, v := range cs {
        v.Close()
    }
shm2.go
New file
@@ -0,0 +1,108 @@
package main
import (
    "demo/deliver"
    "fmt"
    "os"
    "os/signal"
    "time"
    "golang.org/x/sys/unix"
)
func shmSenderImpl2(s deliver.Deliver) {
    var err error
    buf := make([]byte, dLen)
    copy(buf, []byte("hello, give you this"))
    for {
        select {
        case <-ctx.Done():
            return
        default:
            if err = s.Send(buf); err != nil {
                fmt.Printf("can't send message on push socket: %s\n", err.Error())
            } else {
                fmt.Printf("send msg length %d\n", len(buf))
            }
        }
        // time.Sleep(10 * time.Millisecond)
    }
}
func shmReciever2(url string, count int, args ...interface{}) {
    s := deliver.NewServer(deliver.Shm, url, args...)
    go shmRecvImpl2(s, 0)
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, os.Kill, unix.SIGINT)
    <-c
    cancel()
    s.Close()
}
func shmRecvImpl2(c deliver.Deliver, index int) {
    var msg []byte
    var err error
    var t int64
    var elapse int64
    count := 0
    for {
        select {
        case <-ctx.Done():
            return
        default:
            msg, err = c.Recv()
            if err != nil {
                fmt.Println("recv error : ", err)
            }
            if t == 0 {
                t = time.Now().UnixNano()
            }
            elapse = time.Now().UnixNano() - t
            count++
            if elapse > 1e9 {
                fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n",
                    index, count, string(msg), len(msg), elapse)
                elapse = 0
                count = 0
                t = 0
            }
        }
        // time.Sleep(10 * time.Millisecond)
    }
}
func shmSender2(url string, count int) {
    var cs []deliver.Deliver
    for i := 0; i < count; i++ {
        c := deliver.NewClient(deliver.Shm, url)
        cs = append(cs, c)
        go shmSenderImpl2(c)
    }
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
    <-c
    cancel()
    for _, v := range cs {
        v.Close()
    }
}