| | |
| | | [submodule "deliver"] |
| | | path = deliver |
| | | url = ssh://zhangmeng@192.168.1.14:29418/valib/deliver.git |
| | | [submodule "shm"] |
| | | path = shm |
| | | url = ssh://zhangmeng@192.168.1.14:29418/valib/shm.git |
| | |
| | | go 1.12 |
| | | |
| | | require ( |
| | | github.com/gorilla/websocket v1.4.0 // indirect |
| | | github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 // indirect |
| | | github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 // indirect |
| | | github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290 |
| | | golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872 |
| | | github.com/gorilla/websocket v1.4.1 // indirect |
| | | golang.org/x/sys v0.0.0-20190825160603-fb81701db80f |
| | | nanomsg.org/go-mangos v1.4.0 |
| | | ) |
| | |
| | | github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= |
| | | github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= |
| | | github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 h1:n65+IT/xy5+trHm3Zpg9+j7IO4n8pBcPzvaKbMolW8U= |
| | | github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877/go.mod h1:sgTk9wg3WurMlziuB3hcfgHYTz3pEkjQpSCTT8V2pW8= |
| | | github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 h1:uVRQSWD6TOlWlLJ7IYYmbjRr0Xg35ADFN89HGQLPFGI= |
| | | github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9/go.mod h1:vy1jksyhzuQOMkHXMEi+X2bZ47ZeCn3QTnYdFBesABs= |
| | | github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290 h1:5zW+TRr0WH4uN72/E/XYwb1PcaYN5BIB/FUbcQ0nHr0= |
| | | github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290/go.mod h1:e9PZQr6zVezMTwj1v0j1YhGCNdS2zTCjXU9q9K+HHGk= |
| | | golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872 h1:cGjJzUd8RgBw428LXP65YXni0aiGNA4Bl+ls8SmLOm8= |
| | | golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
| | | github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= |
| | | github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= |
| | | golang.org/x/sys v0.0.0-20190825160603-fb81701db80f h1:LCxigP8q3fPRGNVYndYsyHnF0zRrvcoVwZMfb8iQZe4= |
| | | golang.org/x/sys v0.0.0-20190825160603-fb81701db80f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
| | | nanomsg.org/go-mangos v1.4.0 h1:pVRLnzXePdSbhWlWdSncYszTagERhMG5zK/vXYmbEdM= |
| | | nanomsg.org/go-mangos v1.4.0/go.mod h1:MOor8xUIgwsRMPpLr9xQxe7bT7rciibScOqVyztNxHQ= |
| | |
| | | import ( |
| | | "context" |
| | | "demo/deliver" |
| | | "demo/profile" |
| | | "flag" |
| | | "fmt" |
| | | "os" |
| | | "os/signal" |
| | | "time" |
| | | |
| | | "golang.org/x/sys/unix" |
| | | ) |
| | | |
| | | const dLen = 12 * 1024 * 1024 |
| | | |
| | | var ctx, cancel = context.WithCancel(context.Background()) |
| | | |
| | | func senderMode(ipc string, m deliver.Mode, count int, one bool) { |
| | | if m == deliver.ReqRep { |
| | | req(ipc, m) |
| | | } else if m == deliver.Shm { |
| | | shmSender(ipc, 2, 32*1024*1024) |
| | | // shmReciever2(ipc, count, 2, 32*1024*1024) |
| | | } |
| | | |
| | | if one { |
| | | oneSender(ipc, m) |
| | | } else { |
| | | nSender(ipc, m, count) |
| | | } |
| | | } |
| | | |
| | | func recvMode(ipc string, m deliver.Mode, count int, n bool) { |
| | | if m == deliver.ReqRep { |
| | | rep(ipc, m) |
| | | } else if m == deliver.Shm { |
| | | shmReciever(ipc, count) |
| | | // shmSender2(ipc, count) |
| | | } |
| | | |
| | | if n { |
| | | nReciever(ipc, m, count) |
| | | } else { |
| | | oneReciever(ipc, m) |
| | | } |
| | | } |
| | | |
| | | var ( |
| | | proc string |
| | | procCount int |
| | | mode string |
| | | ipc string |
| | | oneSendnRecv bool |
| | | server bool |
| | | proto string |
| | | ipc string |
| | | |
| | | tmm string |
| | | count int |
| | | ) |
| | | |
| | | const ( |
| | | act = "act" |
| | | pass = "pass" |
| | | push = "push" |
| | | pull = "pull" |
| | | pub = "pub" |
| | | sub = "sub" |
| | | req = "req" |
| | | rep = "rep" |
| | | send = "send" // shm send |
| | | recv = "recv" // shm recv |
| | | pair = "pair" |
| | | ) |
| | | |
| | | func init() { |
| | | flag.StringVar(&proc, "p", "act", "proc as sender") |
| | | flag.IntVar(&procCount, "c", 1, "proc run count") |
| | | flag.BoolVar(&server, "s", false, "run as server") |
| | | flag.StringVar(&proto, "p", "", "run protocol e.g. push/pull req/rep send/recv(use shm)") |
| | | flag.StringVar(&ipc, "i", "", "ipc address") |
| | | flag.IntVar(&count, "c", 1, "run client count") |
| | | |
| | | flag.StringVar(&mode, "m", "pushpull", "proc run mode pushpull or pubsub etc.") |
| | | |
| | | flag.StringVar(&ipc, "i", "ipc:///tmp/pic.ipc", "ipc label") |
| | | |
| | | flag.BoolVar(&oneSendnRecv, "n", true, "one send n recv") |
| | | |
| | | flag.StringVar(&tmm, "t", "server", "") |
| | | } |
| | | |
| | | func modeType(t string) deliver.Mode { |
| | | func deliverMode(m string) deliver.Mode { |
| | | |
| | | if t == "pushpull" { |
| | | if m == push || m == pull { |
| | | return deliver.PushPull |
| | | } else if t == "pubsub" { |
| | | } else if m == pub || m == sub { |
| | | return deliver.PubSub |
| | | } else if t == "pair" { |
| | | } else if m == pair { |
| | | return deliver.Pair |
| | | } else if t == "reqrep" { |
| | | } else if m == req || m == rep { |
| | | return deliver.ReqRep |
| | | } else if t == "shm" { |
| | | } else if m == send || m == recv { |
| | | return deliver.Shm |
| | | } |
| | | |
| | |
| | | func main() { |
| | | flag.Parse() |
| | | |
| | | if tmm == "server" { |
| | | c, _ := deliver.NewServerWithTimeout(deliver.PushPull, ipc, 1000) |
| | | // c := deliver.NewServer(deliver.PushPull, ipc) |
| | | nSenderImpl(c, 0) |
| | | } else if tmm == "client" { |
| | | s, _ := deliver.NewServerWithTimeout(deliver.PushPull, ipc, 100) |
| | | |
| | | oneRecvImpl(s, 0) |
| | | |
| | | fnMap := map[string]func(context.Context, bool, string, int){ |
| | | push: profile.Push, |
| | | pull: profile.Pull, |
| | | req: profile.Req, |
| | | rep: profile.Rep, |
| | | send: profile.Shmsend, |
| | | recv: profile.Shmrecv, |
| | | } |
| | | return |
| | | m := modeType(mode) |
| | | if m > deliver.ModeStart { |
| | | if proc == act { |
| | | senderMode(ipc, m, procCount, oneSendnRecv) |
| | | } else { |
| | | recvMode(ipc, m, procCount, oneSendnRecv) |
| | | } |
| | | |
| | | ctx, cancel := context.WithCancel(context.Background()) |
| | | |
| | | if fn, ok := fnMap[proto]; ok { |
| | | fn(ctx, server, ipc, count) |
| | | } else { |
| | | fmt.Println("NO THIS FUNC: ", proto) |
| | | } |
| | | |
| | | c := make(chan os.Signal, 1) |
| | | signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) |
| | | <-c |
| | | |
| | | cancel() |
| | | |
| | | time.Sleep(3 * time.Second) |
| | | |
| | | fmt.Fprintf(os.Stderr, |
| | | "Usage: pushpull push|pull <URL> <ARG> ...\n") |
| | | os.Exit(1) |
New file |
| | |
| | | package profile |
| | | |
| | | import ( |
| | | "context" |
| | | "demo/deliver" |
| | | "fmt" |
| | | "time" |
| | | ) |
| | | |
| | | func puller(ctx context.Context, d deliver.Deliver, index int) { |
| | | var msg []byte |
| | | var err error |
| | | |
| | | var t int64 |
| | | var elapse int64 |
| | | count := 0 |
| | | |
| | | for { |
| | | msg, err = d.Recv() |
| | | if err != nil { |
| | | // fmt.Println("recv error : ", err) |
| | | } |
| | | if t == 0 { |
| | | t = time.Now().UnixNano() |
| | | } |
| | | elapse = time.Now().UnixNano() - t |
| | | |
| | | count++ |
| | | |
| | | if elapse > 1e9 { |
| | | fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", |
| | | index, count, len(msg), elapse) |
| | | elapse = 0 |
| | | count = 0 |
| | | t = 0 |
| | | } |
| | | |
| | | // time.Sleep(10 * time.Millisecond) |
| | | } |
| | | } |
| | | |
| | | func Pull(ctx context.Context, server bool, ipc string, count int) { |
| | | if server { |
| | | d := deliver.NewServer(deliver.PushPull, ipc) |
| | | go puller(ctx, d, 0) |
| | | } else { |
| | | var cs []deliver.Deliver |
| | | |
| | | for i := 0; i < count; i++ { |
| | | c := deliver.NewClient(deliver.PushPull, ipc) |
| | | cs = append(cs, c) |
| | | |
| | | go puller(ctx, c, i) |
| | | } |
| | | } |
| | | } |
New file |
| | |
| | | package profile |
| | | |
| | | import ( |
| | | "context" |
| | | "demo/deliver" |
| | | "fmt" |
| | | ) |
| | | |
| | | const dLen = 32 * 1024 * 1024 |
| | | |
| | | func pusher(ctx context.Context, d deliver.Deliver, index int) { |
| | | var err error |
| | | |
| | | buf := make([]byte, dLen) |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | if err = d.Send(buf); err != nil { |
| | | |
| | | // fmt.Printf("%d can't send message on push socket: %s\n", index, err.Error()) |
| | | } else { |
| | | |
| | | fmt.Printf("%d send msg length %d\n", index, len(buf)) |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | func Push(ctx context.Context, server bool, ipc string, count int) { |
| | | if server { |
| | | d := deliver.NewServer(deliver.PushPull, ipc) |
| | | go pusher(ctx, d, 0) |
| | | } else { |
| | | var cs []deliver.Deliver |
| | | |
| | | for i := 0; i < count; i++ { |
| | | c := deliver.NewClient(deliver.PushPull, ipc) |
| | | cs = append(cs, c) |
| | | |
| | | go pusher(ctx, c, i) |
| | | } |
| | | } |
| | | } |
New file |
| | |
| | | package profile |
| | | |
| | | import ( |
| | | "context" |
| | | "demo/deliver" |
| | | "fmt" |
| | | "time" |
| | | ) |
| | | |
| | | func Req(ctx context.Context, server bool, url string, num int) { |
| | | p := deliver.NewClient(deliver.ReqRep, url) |
| | | var err error |
| | | |
| | | msg := `hello, give me your data` |
| | | |
| | | var t int64 |
| | | var elapse int64 |
| | | count := 0 |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | |
| | | if err = p.Send([]byte(msg)); err != nil { |
| | | |
| | | fmt.Printf("can't send message on push socket: %s\n", err.Error()) |
| | | } else { |
| | | } |
| | | |
| | | if buf, err := p.Recv(); err != nil { |
| | | fmt.Println("recv error: ", err) |
| | | } else { |
| | | if t == 0 { |
| | | t = time.Now().UnixNano() |
| | | } |
| | | elapse = time.Now().UnixNano() - t |
| | | |
| | | count++ |
| | | |
| | | if elapse > 1e9 { |
| | | fmt.Printf("NODE: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", |
| | | count, len(buf), elapse) |
| | | elapse = 0 |
| | | count = 0 |
| | | t = 0 |
| | | } |
| | | |
| | | } |
| | | // time.Sleep(10 * time.Millisecond) |
| | | } |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | func Rep(ctx context.Context, server bool, url string, num int) { |
| | | c := deliver.NewServer(deliver.ReqRep, url) |
| | | |
| | | var msg []byte |
| | | var err error |
| | | |
| | | buf := make([]byte, dLen) |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | |
| | | msg, err = c.Recv() |
| | | if err != nil { |
| | | fmt.Println("recv error : ", err, " msg ", msg) |
| | | continue |
| | | } |
| | | |
| | | c.Send(buf) |
| | | // time.Sleep(10 * time.Millisecond) |
| | | } |
| | | |
| | | } |
| | | } |
New file |
| | |
| | | package profile |
| | | |
| | | import ( |
| | | "context" |
| | | "demo/deliver" |
| | | "fmt" |
| | | "os" |
| | | "time" |
| | | ) |
| | | |
| | | func shmrecver(ctx context.Context, c deliver.Deliver, index int, ch chan<- bool) { |
| | | |
| | | var msg []byte |
| | | var err error |
| | | |
| | | var t int64 |
| | | var elapse int64 |
| | | count := 0 |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | msg, err = c.Recv() |
| | | if err != nil { |
| | | fmt.Println("recv error : ", err) |
| | | return |
| | | } |
| | | if ch != nil { |
| | | ch <- true |
| | | } |
| | | |
| | | if t == 0 { |
| | | t = time.Now().UnixNano() |
| | | } |
| | | elapse = time.Now().UnixNano() - t |
| | | |
| | | count++ |
| | | |
| | | if elapse > 1e9 { |
| | | fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n", |
| | | index, count, string(msg), len(msg), elapse) |
| | | elapse = 0 |
| | | count = 0 |
| | | t = 0 |
| | | } |
| | | } |
| | | |
| | | // time.Sleep(10 * time.Millisecond) |
| | | } |
| | | |
| | | } |
| | | |
| | | func Shmrecv(ctx context.Context, server bool, ipc string, count int) { |
| | | blocks := 2 |
| | | |
| | | if server { |
| | | s, err := deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen) |
| | | for { |
| | | if err == nil { |
| | | break |
| | | } |
| | | time.Sleep(1 * time.Second) |
| | | s, err = deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen) |
| | | } |
| | | go shmrecver(ctx, s, 0, nil) |
| | | |
| | | } else { |
| | | // recvers(ctx, ipc, count, nil) |
| | | |
| | | chWaiter := make(chan bool, count) |
| | | cs := recvers(ctx, ipc, count, chWaiter) |
| | | |
| | | go func() { |
| | | waitCount := 0 |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | case <-chWaiter: |
| | | waitCount = 0 |
| | | default: |
| | | if waitCount > 200*3 { |
| | | for _, v := range cs { |
| | | v.Close() |
| | | } |
| | | cs = recvers(ctx, ipc, count, chWaiter) |
| | | fmt.Println("restart recievers") |
| | | waitCount = 0 |
| | | continue |
| | | } |
| | | time.Sleep(time.Millisecond * 5) |
| | | waitCount++ |
| | | } |
| | | } |
| | | }() |
| | | } |
| | | } |
| | | |
| | | func recvers(ctx context.Context, url string, count int, ch chan<- bool) []deliver.Deliver { |
| | | for { |
| | | file := "/dev/shm/" + url |
| | | exist, _ := pathExists(file) |
| | | if exist { |
| | | break |
| | | } |
| | | fmt.Println("wait for shm server start") |
| | | time.Sleep(time.Second) |
| | | } |
| | | var cs []deliver.Deliver |
| | | for i := 0; i < count; i++ { |
| | | c, err := deliver.NewClientWithError(deliver.Shm, url) |
| | | for { |
| | | if err == nil { |
| | | break |
| | | } |
| | | time.Sleep(1 * time.Second) |
| | | c, err = deliver.NewClientWithError(deliver.Shm, url) |
| | | fmt.Println(i, " client create failed : ", err) |
| | | } |
| | | cs = append(cs, c) |
| | | go shmrecver(ctx, c, i, ch) |
| | | } |
| | | return cs |
| | | } |
| | | |
| | | func pathExists(path string) (bool, error) { |
| | | _, err := os.Stat(path) |
| | | if err == nil { |
| | | return true, nil |
| | | } |
| | | if os.IsNotExist(err) { |
| | | return false, nil |
| | | } |
| | | return false, err |
| | | } |
New file |
| | |
| | | package profile |
| | | |
| | | import ( |
| | | "context" |
| | | "demo/deliver" |
| | | "fmt" |
| | | "time" |
| | | ) |
| | | |
| | | func shmsender(ctx context.Context, s deliver.Deliver, index int) { |
| | | var err error |
| | | |
| | | buf := make([]byte, dLen) |
| | | |
| | | copy(buf, []byte("hello, give you this")) |
| | | for { |
| | | |
| | | select { |
| | | case <-ctx.Done(): |
| | | s.Close() |
| | | fmt.Println("quit shm sender") |
| | | return |
| | | default: |
| | | if err = s.Send(buf); err != nil { |
| | | |
| | | fmt.Printf("can't send message on push socket: %s\n", err.Error()) |
| | | } else { |
| | | |
| | | fmt.Printf("send msg length %d\n", len(buf)) |
| | | } |
| | | } |
| | | |
| | | // time.Sleep(10 * time.Millisecond) |
| | | } |
| | | } |
| | | |
| | | func Shmsend(ctx context.Context, server bool, ipc string, count int) { |
| | | blocks := 2 |
| | | if server { |
| | | s, err := deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen) |
| | | for { |
| | | if err == nil { |
| | | break |
| | | } |
| | | time.Sleep(1 * time.Second) |
| | | s, err = deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen) |
| | | } |
| | | go shmsender(ctx, s, 0) |
| | | } else { |
| | | |
| | | var cs []deliver.Deliver |
| | | for i := 0; i < count; i++ { |
| | | c, err := deliver.NewClientWithError(deliver.Shm, ipc) |
| | | for { |
| | | if err == nil { |
| | | break |
| | | } |
| | | time.Sleep(1 * time.Second) |
| | | c, err = deliver.NewClientWithError(deliver.Shm, ipc) |
| | | } |
| | | cs = append(cs, c) |
| | | go shmsender(ctx, c, i) |
| | | } |
| | | } |
| | | } |
New file |
| | |
| | | Subproject commit 0000000000000000000000000000000000000000 |
| | | Subproject commit a0123f163eddcea3e6b9f9d36f1f3fb3aa2c835a |