zhangmeng
2019-08-26 cb4a30b9c26317a94e0be16029c8107d830fcd0d
update
5个文件已删除
6个文件已添加
4个文件已修改
1064 ■■■■■ 已修改文件
.gitmodules 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
1push-npull.go 106 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 124 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
npush-1pull.go 101 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
profile/pull.go 56 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
profile/push.go 46 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
profile/reqrep.go 83 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
profile/shmrecv.go 137 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
profile/shmsend.go 65 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
reqrep.go 69 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
runShm.go 129 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
runShm2.go 122 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
shm @ a0123f 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitmodules
@@ -1,3 +1,6 @@
[submodule "deliver"]
    path = deliver
    url = ssh://zhangmeng@192.168.1.14:29418/valib/deliver.git
[submodule "shm"]
    path = shm
    url = ssh://zhangmeng@192.168.1.14:29418/valib/shm.git
1push-npull.go
File was deleted
go.mod
@@ -3,10 +3,7 @@
go 1.12
require (
    github.com/gorilla/websocket v1.4.0 // indirect
    github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 // indirect
    github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 // indirect
    github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290
    golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872
    github.com/gorilla/websocket v1.4.1 // indirect
    golang.org/x/sys v0.0.0-20190825160603-fb81701db80f
    nanomsg.org/go-mangos v1.4.0
)
go.sum
@@ -1,12 +1,6 @@
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 h1:n65+IT/xy5+trHm3Zpg9+j7IO4n8pBcPzvaKbMolW8U=
github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877/go.mod h1:sgTk9wg3WurMlziuB3hcfgHYTz3pEkjQpSCTT8V2pW8=
github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 h1:uVRQSWD6TOlWlLJ7IYYmbjRr0Xg35ADFN89HGQLPFGI=
github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9/go.mod h1:vy1jksyhzuQOMkHXMEi+X2bZ47ZeCn3QTnYdFBesABs=
github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290 h1:5zW+TRr0WH4uN72/E/XYwb1PcaYN5BIB/FUbcQ0nHr0=
github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290/go.mod h1:e9PZQr6zVezMTwj1v0j1YhGCNdS2zTCjXU9q9K+HHGk=
golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872 h1:cGjJzUd8RgBw428LXP65YXni0aiGNA4Bl+ls8SmLOm8=
golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
golang.org/x/sys v0.0.0-20190825160603-fb81701db80f h1:LCxigP8q3fPRGNVYndYsyHnF0zRrvcoVwZMfb8iQZe4=
golang.org/x/sys v0.0.0-20190825160603-fb81701db80f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
nanomsg.org/go-mangos v1.4.0 h1:pVRLnzXePdSbhWlWdSncYszTagERhMG5zK/vXYmbEdM=
nanomsg.org/go-mangos v1.4.0/go.mod h1:MOor8xUIgwsRMPpLr9xQxe7bT7rciibScOqVyztNxHQ=
main.go
@@ -3,84 +3,55 @@
import (
    "context"
    "demo/deliver"
    "demo/profile"
    "flag"
    "fmt"
    "os"
    "os/signal"
    "time"
    "golang.org/x/sys/unix"
)
const dLen = 12 * 1024 * 1024
var ctx, cancel = context.WithCancel(context.Background())
func senderMode(ipc string, m deliver.Mode, count int, one bool) {
    if m == deliver.ReqRep {
        req(ipc, m)
    } else if m == deliver.Shm {
        shmSender(ipc, 2, 32*1024*1024)
        // shmReciever2(ipc, count, 2, 32*1024*1024)
    }
    if one {
        oneSender(ipc, m)
    } else {
        nSender(ipc, m, count)
    }
}
func recvMode(ipc string, m deliver.Mode, count int, n bool) {
    if m == deliver.ReqRep {
        rep(ipc, m)
    } else if m == deliver.Shm {
        shmReciever(ipc, count)
        // shmSender2(ipc, count)
    }
    if n {
        nReciever(ipc, m, count)
    } else {
        oneReciever(ipc, m)
    }
}
var (
    proc         string
    procCount    int
    mode         string
    ipc          string
    oneSendnRecv bool
    server bool
    proto  string
    ipc    string
    tmm string
    count int
)
const (
    act  = "act"
    pass = "pass"
    push = "push"
    pull = "pull"
    pub  = "pub"
    sub  = "sub"
    req  = "req"
    rep  = "rep"
    send = "send" // shm send
    recv = "recv" // shm recv
    pair = "pair"
)
func init() {
    flag.StringVar(&proc, "p", "act", "proc as sender")
    flag.IntVar(&procCount, "c", 1, "proc run count")
    flag.BoolVar(&server, "s", false, "run as server")
    flag.StringVar(&proto, "p", "", "run protocol e.g. push/pull req/rep send/recv(use shm)")
    flag.StringVar(&ipc, "i", "", "ipc address")
    flag.IntVar(&count, "c", 1, "run client count")
    flag.StringVar(&mode, "m", "pushpull", "proc run mode pushpull or pubsub etc.")
    flag.StringVar(&ipc, "i", "ipc:///tmp/pic.ipc", "ipc label")
    flag.BoolVar(&oneSendnRecv, "n", true, "one send n recv")
    flag.StringVar(&tmm, "t", "server", "")
}
func modeType(t string) deliver.Mode {
func deliverMode(m string) deliver.Mode {
    if t == "pushpull" {
    if m == push || m == pull {
        return deliver.PushPull
    } else if t == "pubsub" {
    } else if m == pub || m == sub {
        return deliver.PubSub
    } else if t == "pair" {
    } else if m == pair {
        return deliver.Pair
    } else if t == "reqrep" {
    } else if m == req || m == rep {
        return deliver.ReqRep
    } else if t == "shm" {
    } else if m == send || m == recv {
        return deliver.Shm
    }
@@ -90,26 +61,31 @@
func main() {
    flag.Parse()
    if tmm == "server" {
        c, _ := deliver.NewServerWithTimeout(deliver.PushPull, ipc, 1000)
        // c := deliver.NewServer(deliver.PushPull, ipc)
        nSenderImpl(c, 0)
    } else if tmm == "client" {
        s, _ := deliver.NewServerWithTimeout(deliver.PushPull, ipc, 100)
        oneRecvImpl(s, 0)
    fnMap := map[string]func(context.Context, bool, string, int){
        push: profile.Push,
        pull: profile.Pull,
        req:  profile.Req,
        rep:  profile.Rep,
        send: profile.Shmsend,
        recv: profile.Shmrecv,
    }
    return
    m := modeType(mode)
    if m > deliver.ModeStart {
        if proc == act {
            senderMode(ipc, m, procCount, oneSendnRecv)
        } else {
            recvMode(ipc, m, procCount, oneSendnRecv)
        }
    ctx, cancel := context.WithCancel(context.Background())
    if fn, ok := fnMap[proto]; ok {
        fn(ctx, server, ipc, count)
    } else {
        fmt.Println("NO THIS FUNC: ", proto)
    }
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
    <-c
    cancel()
    time.Sleep(3 * time.Second)
    fmt.Fprintf(os.Stderr,
        "Usage: pushpull push|pull <URL> <ARG> ...\n")
    os.Exit(1)
npush-1pull.go
File was deleted
profile/pull.go
New file
@@ -0,0 +1,56 @@
package profile
import (
    "context"
    "demo/deliver"
    "fmt"
    "time"
)
func puller(ctx context.Context, d deliver.Deliver, index int) {
    var msg []byte
    var err error
    var t int64
    var elapse int64
    count := 0
    for {
        msg, err = d.Recv()
        if err != nil {
            // fmt.Println("recv error : ", err)
        }
        if t == 0 {
            t = time.Now().UnixNano()
        }
        elapse = time.Now().UnixNano() - t
        count++
        if elapse > 1e9 {
            fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
                index, count, len(msg), elapse)
            elapse = 0
            count = 0
            t = 0
        }
        // time.Sleep(10 * time.Millisecond)
    }
}
func Pull(ctx context.Context, server bool, ipc string, count int) {
    if server {
        d := deliver.NewServer(deliver.PushPull, ipc)
        go puller(ctx, d, 0)
    } else {
        var cs []deliver.Deliver
        for i := 0; i < count; i++ {
            c := deliver.NewClient(deliver.PushPull, ipc)
            cs = append(cs, c)
            go puller(ctx, c, i)
        }
    }
}
profile/push.go
New file
@@ -0,0 +1,46 @@
package profile
import (
    "context"
    "demo/deliver"
    "fmt"
)
const dLen = 32 * 1024 * 1024
func pusher(ctx context.Context, d deliver.Deliver, index int) {
    var err error
    buf := make([]byte, dLen)
    for {
        select {
        case <-ctx.Done():
            return
        default:
            if err = d.Send(buf); err != nil {
                // fmt.Printf("%d can't send message on push socket: %s\n", index, err.Error())
            } else {
                fmt.Printf("%d send msg length %d\n", index, len(buf))
            }
        }
    }
}
func Push(ctx context.Context, server bool, ipc string, count int) {
    if server {
        d := deliver.NewServer(deliver.PushPull, ipc)
        go pusher(ctx, d, 0)
    } else {
        var cs []deliver.Deliver
        for i := 0; i < count; i++ {
            c := deliver.NewClient(deliver.PushPull, ipc)
            cs = append(cs, c)
            go pusher(ctx, c, i)
        }
    }
}
profile/reqrep.go
New file
@@ -0,0 +1,83 @@
package profile
import (
    "context"
    "demo/deliver"
    "fmt"
    "time"
)
func Req(ctx context.Context, server bool, url string, num int) {
    p := deliver.NewClient(deliver.ReqRep, url)
    var err error
    msg := `hello, give me your data`
    var t int64
    var elapse int64
    count := 0
    for {
        select {
        case <-ctx.Done():
            return
        default:
            if err = p.Send([]byte(msg)); err != nil {
                fmt.Printf("can't send message on push socket: %s\n", err.Error())
            } else {
            }
            if buf, err := p.Recv(); err != nil {
                fmt.Println("recv error: ", err)
            } else {
                if t == 0 {
                    t = time.Now().UnixNano()
                }
                elapse = time.Now().UnixNano() - t
                count++
                if elapse > 1e9 {
                    fmt.Printf("NODE: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
                        count, len(buf), elapse)
                    elapse = 0
                    count = 0
                    t = 0
                }
            }
            // time.Sleep(10 * time.Millisecond)
        }
    }
}
func Rep(ctx context.Context, server bool, url string, num int) {
    c := deliver.NewServer(deliver.ReqRep, url)
    var msg []byte
    var err error
    buf := make([]byte, dLen)
    for {
        select {
        case <-ctx.Done():
            return
        default:
            msg, err = c.Recv()
            if err != nil {
                fmt.Println("recv error : ", err, " msg ", msg)
                continue
            }
            c.Send(buf)
            // time.Sleep(10 * time.Millisecond)
        }
    }
}
profile/shmrecv.go
New file
@@ -0,0 +1,137 @@
package profile
import (
    "context"
    "demo/deliver"
    "fmt"
    "os"
    "time"
)
func shmrecver(ctx context.Context, c deliver.Deliver, index int, ch chan<- bool) {
    var msg []byte
    var err error
    var t int64
    var elapse int64
    count := 0
    for {
        select {
        case <-ctx.Done():
            return
        default:
            msg, err = c.Recv()
            if err != nil {
                fmt.Println("recv error : ", err)
                return
            }
            if ch != nil {
                ch <- true
            }
            if t == 0 {
                t = time.Now().UnixNano()
            }
            elapse = time.Now().UnixNano() - t
            count++
            if elapse > 1e9 {
                fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n",
                    index, count, string(msg), len(msg), elapse)
                elapse = 0
                count = 0
                t = 0
            }
        }
        // time.Sleep(10 * time.Millisecond)
    }
}
func Shmrecv(ctx context.Context, server bool, ipc string, count int) {
    blocks := 2
    if server {
        s, err := deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen)
        for {
            if err == nil {
                break
            }
            time.Sleep(1 * time.Second)
            s, err = deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen)
        }
        go shmrecver(ctx, s, 0, nil)
    } else {
        // recvers(ctx, ipc, count, nil)
        chWaiter := make(chan bool, count)
        cs := recvers(ctx, ipc, count, chWaiter)
        go func() {
            waitCount := 0
            for {
                select {
                case <-ctx.Done():
                    return
                case <-chWaiter:
                    waitCount = 0
                default:
                    if waitCount > 200*3 {
                        for _, v := range cs {
                            v.Close()
                        }
                        cs = recvers(ctx, ipc, count, chWaiter)
                        fmt.Println("restart recievers")
                        waitCount = 0
                        continue
                    }
                    time.Sleep(time.Millisecond * 5)
                    waitCount++
                }
            }
        }()
    }
}
func recvers(ctx context.Context, url string, count int, ch chan<- bool) []deliver.Deliver {
    for {
        file := "/dev/shm/" + url
        exist, _ := pathExists(file)
        if exist {
            break
        }
        fmt.Println("wait for shm server start")
        time.Sleep(time.Second)
    }
    var cs []deliver.Deliver
    for i := 0; i < count; i++ {
        c, err := deliver.NewClientWithError(deliver.Shm, url)
        for {
            if err == nil {
                break
            }
            time.Sleep(1 * time.Second)
            c, err = deliver.NewClientWithError(deliver.Shm, url)
            fmt.Println(i, " client create failed : ", err)
        }
        cs = append(cs, c)
        go shmrecver(ctx, c, i, ch)
    }
    return cs
}
func pathExists(path string) (bool, error) {
    _, err := os.Stat(path)
    if err == nil {
        return true, nil
    }
    if os.IsNotExist(err) {
        return false, nil
    }
    return false, err
}
profile/shmsend.go
New file
@@ -0,0 +1,65 @@
package profile
import (
    "context"
    "demo/deliver"
    "fmt"
    "time"
)
func shmsender(ctx context.Context, s deliver.Deliver, index int) {
    var err error
    buf := make([]byte, dLen)
    copy(buf, []byte("hello, give you this"))
    for {
        select {
        case <-ctx.Done():
            s.Close()
            fmt.Println("quit shm sender")
            return
        default:
            if err = s.Send(buf); err != nil {
                fmt.Printf("can't send message on push socket: %s\n", err.Error())
            } else {
                fmt.Printf("send msg length %d\n", len(buf))
            }
        }
        // time.Sleep(10 * time.Millisecond)
    }
}
func Shmsend(ctx context.Context, server bool, ipc string, count int) {
    blocks := 2
    if server {
        s, err := deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen)
        for {
            if err == nil {
                break
            }
            time.Sleep(1 * time.Second)
            s, err = deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen)
        }
        go shmsender(ctx, s, 0)
    } else {
        var cs []deliver.Deliver
        for i := 0; i < count; i++ {
            c, err := deliver.NewClientWithError(deliver.Shm, ipc)
            for {
                if err == nil {
                    break
                }
                time.Sleep(1 * time.Second)
                c, err = deliver.NewClientWithError(deliver.Shm, ipc)
            }
            cs = append(cs, c)
            go shmsender(ctx, c, i)
        }
    }
}
reqrep.go
File was deleted
runShm.go
File was deleted
runShm2.go
File was deleted
shm
New file
@@ -1 +1 @@
Subproject commit 0000000000000000000000000000000000000000
Subproject commit a0123f163eddcea3e6b9f9d36f1f3fb3aa2c835a