| | |
| | | Subproject commit d23f54e337d12fb4e6d5a0a5e1f041a51005e10c |
| | | Subproject commit 9a89af693b9336633bcac2a652c294f782e6b3b1 |
| | |
| | | |
| | | require ( |
| | | github.com/gorilla/websocket v1.4.0 // indirect |
| | | github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 // indirect |
| | | github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 // indirect |
| | | github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290 |
| | | golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872 |
| | | nanomsg.org/go-mangos v1.4.0 |
| | | ) |
| | |
| | | github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= |
| | | github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= |
| | | github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 h1:n65+IT/xy5+trHm3Zpg9+j7IO4n8pBcPzvaKbMolW8U= |
| | | github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877/go.mod h1:sgTk9wg3WurMlziuB3hcfgHYTz3pEkjQpSCTT8V2pW8= |
| | | github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 h1:uVRQSWD6TOlWlLJ7IYYmbjRr0Xg35ADFN89HGQLPFGI= |
| | | github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9/go.mod h1:vy1jksyhzuQOMkHXMEi+X2bZ47ZeCn3QTnYdFBesABs= |
| | | github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290 h1:5zW+TRr0WH4uN72/E/XYwb1PcaYN5BIB/FUbcQ0nHr0= |
| | | github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290/go.mod h1:e9PZQr6zVezMTwj1v0j1YhGCNdS2zTCjXU9q9K+HHGk= |
| | | golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872 h1:cGjJzUd8RgBw428LXP65YXni0aiGNA4Bl+ls8SmLOm8= |
| | | golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
| | | nanomsg.org/go-mangos v1.4.0 h1:pVRLnzXePdSbhWlWdSncYszTagERhMG5zK/vXYmbEdM= |
| | |
| | | package main |
| | | |
| | | import ( |
| | | "context" |
| | | "demo/deliver" |
| | | "fmt" |
| | | "os" |
| | | ) |
| | | |
| | | const dLen = 12 * 1024 * 1024 |
| | | |
| | | var ctx, cancel = context.WithCancel(context.Background()) |
| | | |
| | | func modeType(t string) deliver.Mode { |
| | | |
| | |
| | | return deliver.Pair |
| | | } else if t == "reqrep" { |
| | | return deliver.ReqRep |
| | | } else if t == "shm" { |
| | | return deliver.Shm |
| | | } |
| | | |
| | | return deliver.Mode(-1) |
| | | return deliver.NONE |
| | | } |
| | | |
| | | func senderMode(ipc string, m deliver.Mode) { |
| | | if m == deliver.ReqRep { |
| | | req(ipc, m) |
| | | } else if m == deliver.Shm { |
| | | shmSender(ipc, 2, 32*1024*1024) |
| | | } |
| | | sender(ipc, m) |
| | | } |
| | |
| | | func recvMode(ipc string, m deliver.Mode, strCount string) { |
| | | if m == deliver.ReqRep { |
| | | rep(ipc, m) |
| | | } else if m == deliver.Shm { |
| | | shmReciever(ipc, strCount) |
| | | } |
| | | reciever(ipc, m, strCount) |
| | | } |
| | |
| | | |
| | | fmt.Printf("can't send message on push socket: %s\n", err.Error()) |
| | | } else { |
| | | |
| | | fmt.Printf("send msg length %d\n", len(msg)) |
| | | } |
| | | |
| | | if buf, err := p.Recv(); err != nil { |
| | |
| | | fmt.Println("recv error : ", err, " msg ", msg) |
| | | continue |
| | | } |
| | | fmt.Println("recv msg: ", string(msg)) |
| | | |
| | | c.Send(buf) |
| | | // time.Sleep(10 * time.Millisecond) |
New file |
| | |
| | | package main |
| | | |
| | | import ( |
| | | "demo/deliver" |
| | | "fmt" |
| | | "os" |
| | | "os/signal" |
| | | "strconv" |
| | | "time" |
| | | |
| | | "golang.org/x/sys/unix" |
| | | ) |
| | | |
| | | func shmSenderImpl(s deliver.Deliver) { |
| | | var err error |
| | | |
| | | buf := make([]byte, dLen) |
| | | |
| | | copy(buf, []byte("hello, give you this")) |
| | | for { |
| | | |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | if err = s.Send(buf); err != nil { |
| | | |
| | | fmt.Printf("can't send message on push socket: %s\n", err.Error()) |
| | | } else { |
| | | |
| | | fmt.Printf("send msg length %d\n", len(buf)) |
| | | } |
| | | } |
| | | |
| | | // time.Sleep(10 * time.Millisecond) |
| | | } |
| | | |
| | | } |
| | | |
| | | func shmSender(url string, args ...interface{}) { |
| | | s := deliver.NewServer(deliver.Shm, url, args...) |
| | | |
| | | go shmSenderImpl(s) |
| | | |
| | | c := make(chan os.Signal, 1) |
| | | signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) |
| | | <-c |
| | | |
| | | cancel() |
| | | s.Close() |
| | | } |
| | | |
| | | func shmRecvImpl(c deliver.Deliver, url string, index int) { |
| | | |
| | | var msg []byte |
| | | var err error |
| | | |
| | | var t int64 |
| | | var elapse int64 |
| | | count := 0 |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | msg, err = c.Recv() |
| | | if err != nil { |
| | | fmt.Println("recv error : ", err) |
| | | } |
| | | if t == 0 { |
| | | t = time.Now().UnixNano() |
| | | } |
| | | elapse = time.Now().UnixNano() - t |
| | | |
| | | count++ |
| | | |
| | | if elapse > 1e9 { |
| | | fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n", |
| | | index, count, string(msg), len(msg), elapse) |
| | | elapse = 0 |
| | | count = 0 |
| | | t = 0 |
| | | } |
| | | } |
| | | |
| | | // time.Sleep(10 * time.Millisecond) |
| | | } |
| | | } |
| | | |
| | | func shmReciever(url string, strCount string) { |
| | | count, _ := strconv.Atoi(strCount) |
| | | |
| | | var cs []deliver.Deliver |
| | | for i := 0; i < count; i++ { |
| | | c := deliver.NewClient(deliver.Shm, url) |
| | | cs = append(cs, c) |
| | | go shmRecvImpl(c, url, i) |
| | | } |
| | | |
| | | c := make(chan os.Signal, 1) |
| | | signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) |
| | | <-c |
| | | |
| | | cancel() |
| | | for _, v := range cs { |
| | | v.Close() |
| | | } |
| | | } |