zhangmeng
2019-05-20 622b701e27351e28a6c3df579d4423120afe79fc
update deliver
1个文件已添加
5个文件已修改
134 ■■■■■ 已修改文件
deliver @ 9a89af 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
reqrep.go 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
shm.go 109 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
deliver
@@ -1 +1 @@
Subproject commit d23f54e337d12fb4e6d5a0a5e1f041a51005e10c
Subproject commit 9a89af693b9336633bcac2a652c294f782e6b3b1
go.mod
@@ -4,6 +4,9 @@
require (
    github.com/gorilla/websocket v1.4.0 // indirect
    github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 // indirect
    github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 // indirect
    github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290
    golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872
    nanomsg.org/go-mangos v1.4.0
)
go.sum
@@ -1,5 +1,11 @@
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 h1:n65+IT/xy5+trHm3Zpg9+j7IO4n8pBcPzvaKbMolW8U=
github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877/go.mod h1:sgTk9wg3WurMlziuB3hcfgHYTz3pEkjQpSCTT8V2pW8=
github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 h1:uVRQSWD6TOlWlLJ7IYYmbjRr0Xg35ADFN89HGQLPFGI=
github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9/go.mod h1:vy1jksyhzuQOMkHXMEi+X2bZ47ZeCn3QTnYdFBesABs=
github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290 h1:5zW+TRr0WH4uN72/E/XYwb1PcaYN5BIB/FUbcQ0nHr0=
github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290/go.mod h1:e9PZQr6zVezMTwj1v0j1YhGCNdS2zTCjXU9q9K+HHGk=
golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872 h1:cGjJzUd8RgBw428LXP65YXni0aiGNA4Bl+ls8SmLOm8=
golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
nanomsg.org/go-mangos v1.4.0 h1:pVRLnzXePdSbhWlWdSncYszTagERhMG5zK/vXYmbEdM=
main.go
@@ -1,12 +1,15 @@
package main
import (
    "context"
    "demo/deliver"
    "fmt"
    "os"
)
const dLen = 12 * 1024 * 1024
var ctx, cancel = context.WithCancel(context.Background())
func modeType(t string) deliver.Mode {
@@ -18,14 +21,18 @@
        return deliver.Pair
    } else if t == "reqrep" {
        return deliver.ReqRep
    } else if t == "shm" {
        return deliver.Shm
    }
    return deliver.Mode(-1)
    return deliver.NONE
}
func senderMode(ipc string, m deliver.Mode) {
    if m == deliver.ReqRep {
        req(ipc, m)
    } else if m == deliver.Shm {
        shmSender(ipc, 2, 32*1024*1024)
    }
    sender(ipc, m)
}
@@ -33,6 +40,8 @@
func recvMode(ipc string, m deliver.Mode, strCount string) {
    if m == deliver.ReqRep {
        rep(ipc, m)
    } else if m == deliver.Shm {
        shmReciever(ipc, strCount)
    }
    reciever(ipc, m, strCount)
}
reqrep.go
@@ -22,8 +22,6 @@
            fmt.Printf("can't send message on push socket: %s\n", err.Error())
        } else {
            fmt.Printf("send msg length %d\n", len(msg))
        }
        if buf, err := p.Recv(); err != nil {
@@ -64,7 +62,6 @@
            fmt.Println("recv error : ", err, " msg ", msg)
            continue
        }
        fmt.Println("recv msg: ", string(msg))
        c.Send(buf)
        // time.Sleep(10 * time.Millisecond)
shm.go
New file
@@ -0,0 +1,109 @@
package main
import (
    "demo/deliver"
    "fmt"
    "os"
    "os/signal"
    "strconv"
    "time"
    "golang.org/x/sys/unix"
)
func shmSenderImpl(s deliver.Deliver) {
    var err error
    buf := make([]byte, dLen)
    copy(buf, []byte("hello, give you this"))
    for {
        select {
        case <-ctx.Done():
            return
        default:
            if err = s.Send(buf); err != nil {
                fmt.Printf("can't send message on push socket: %s\n", err.Error())
            } else {
                fmt.Printf("send msg length %d\n", len(buf))
            }
        }
        // time.Sleep(10 * time.Millisecond)
    }
}
func shmSender(url string, args ...interface{}) {
    s := deliver.NewServer(deliver.Shm, url, args...)
    go shmSenderImpl(s)
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
    <-c
    cancel()
    s.Close()
}
func shmRecvImpl(c deliver.Deliver, url string, index int) {
    var msg []byte
    var err error
    var t int64
    var elapse int64
    count := 0
    for {
        select {
        case <-ctx.Done():
            return
        default:
            msg, err = c.Recv()
            if err != nil {
                fmt.Println("recv error : ", err)
            }
            if t == 0 {
                t = time.Now().UnixNano()
            }
            elapse = time.Now().UnixNano() - t
            count++
            if elapse > 1e9 {
                fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n",
                    index, count, string(msg), len(msg), elapse)
                elapse = 0
                count = 0
                t = 0
            }
        }
        // time.Sleep(10 * time.Millisecond)
    }
}
func shmReciever(url string, strCount string) {
    count, _ := strconv.Atoi(strCount)
    var cs []deliver.Deliver
    for i := 0; i < count; i++ {
        c := deliver.NewClient(deliver.Shm, url)
        cs = append(cs, c)
        go shmRecvImpl(c, url, i)
    }
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
    <-c
    cancel()
    for _, v := range cs {
        v.Close()
    }
}