From cb4a30b9c26317a94e0be16029c8107d830fcd0d Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期一, 26 八月 2019 13:46:48 +0800 Subject: [PATCH] update --- /dev/null | 122 ----------- go.sum | 14 .gitmodules | 3 profile/pull.go | 56 +++++ profile/shmrecv.go | 137 ++++++++++++ go.mod | 7 profile/push.go | 46 ++++ profile/reqrep.go | 83 +++++++ shm | 2 main.go | 124 ++++------ profile/shmsend.go | 65 +++++ 11 files changed, 447 insertions(+), 212 deletions(-) diff --git a/.gitmodules b/.gitmodules index 0dc6d55..9a70181 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "deliver"] path = deliver url = ssh://zhangmeng@192.168.1.14:29418/valib/deliver.git +[submodule "shm"] + path = shm + url = ssh://zhangmeng@192.168.1.14:29418/valib/shm.git diff --git a/1push-npull.go b/1push-npull.go deleted file mode 100644 index 31ea21a..0000000 --- a/1push-npull.go +++ /dev/null @@ -1,106 +0,0 @@ -package main - -import ( - "demo/deliver" - "fmt" - "os" - "os/signal" - "time" - - "golang.org/x/sys/unix" -) - -func oneSenderImpl(s deliver.Deliver) { - var err error - - buf := make([]byte, dLen) - - for { - select { - case <-ctx.Done(): - return - default: - - if err = s.Send(buf); err != nil { - - fmt.Printf("can't send message on push socket: %s\n", err.Error()) - } else { - - fmt.Printf("send msg length %d\n", len(buf)) - } - } - } - -} - -func oneSender(url string, m deliver.Mode, args ...interface{}) { - s := deliver.NewServer(m, url, args...) - - go oneSenderImpl(s) - - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) - <-c - - cancel() - s.Close() -} - -func nRecvImpl(c deliver.Deliver, index int) { - - var msg []byte - var err error - - var t int64 - var elapse int64 - count := 0 - - for { - select { - case <-ctx.Done(): - return - default: - msg, err = c.Recv() - if err != nil { - fmt.Println("recv error : ", err) - } - if t == 0 { - t = time.Now().UnixNano() - } - elapse = time.Now().UnixNano() - t - - count++ - - if elapse > 1e9 { - fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", - index, count, len(msg), elapse) - elapse = 0 - count = 0 - t = 0 - } - } - - } -} - -func nReciever(url string, m deliver.Mode, count int) { - - var cs []deliver.Deliver - - for i := 0; i < count; i++ { - c := deliver.NewClient(m, url) - cs = append(cs, c) - - go nRecvImpl(c, i) - } - - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) - <-c - - cancel() - for _, v := range cs { - v.Close() - } - -} diff --git a/go.mod b/go.mod index f9b1263..77b954b 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,7 @@ go 1.12 require ( - github.com/gorilla/websocket v1.4.0 // indirect - github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 // indirect - github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 // indirect - github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290 - golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872 + github.com/gorilla/websocket v1.4.1 // indirect + golang.org/x/sys v0.0.0-20190825160603-fb81701db80f nanomsg.org/go-mangos v1.4.0 ) diff --git a/go.sum b/go.sum index 6fb9f50..fd43cdd 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,6 @@ -github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= -github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= -github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 h1:n65+IT/xy5+trHm3Zpg9+j7IO4n8pBcPzvaKbMolW8U= -github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877/go.mod h1:sgTk9wg3WurMlziuB3hcfgHYTz3pEkjQpSCTT8V2pW8= -github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 h1:uVRQSWD6TOlWlLJ7IYYmbjRr0Xg35ADFN89HGQLPFGI= -github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9/go.mod h1:vy1jksyhzuQOMkHXMEi+X2bZ47ZeCn3QTnYdFBesABs= -github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290 h1:5zW+TRr0WH4uN72/E/XYwb1PcaYN5BIB/FUbcQ0nHr0= -github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290/go.mod h1:e9PZQr6zVezMTwj1v0j1YhGCNdS2zTCjXU9q9K+HHGk= -golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872 h1:cGjJzUd8RgBw428LXP65YXni0aiGNA4Bl+ls8SmLOm8= -golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= +github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +golang.org/x/sys v0.0.0-20190825160603-fb81701db80f h1:LCxigP8q3fPRGNVYndYsyHnF0zRrvcoVwZMfb8iQZe4= +golang.org/x/sys v0.0.0-20190825160603-fb81701db80f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= nanomsg.org/go-mangos v1.4.0 h1:pVRLnzXePdSbhWlWdSncYszTagERhMG5zK/vXYmbEdM= nanomsg.org/go-mangos v1.4.0/go.mod h1:MOor8xUIgwsRMPpLr9xQxe7bT7rciibScOqVyztNxHQ= diff --git a/main.go b/main.go index fb8204a..c46e870 100644 --- a/main.go +++ b/main.go @@ -3,84 +3,55 @@ import ( "context" "demo/deliver" + "demo/profile" "flag" "fmt" "os" + "os/signal" + "time" + + "golang.org/x/sys/unix" ) -const dLen = 12 * 1024 * 1024 - -var ctx, cancel = context.WithCancel(context.Background()) - -func senderMode(ipc string, m deliver.Mode, count int, one bool) { - if m == deliver.ReqRep { - req(ipc, m) - } else if m == deliver.Shm { - shmSender(ipc, 2, 32*1024*1024) - // shmReciever2(ipc, count, 2, 32*1024*1024) - } - - if one { - oneSender(ipc, m) - } else { - nSender(ipc, m, count) - } -} - -func recvMode(ipc string, m deliver.Mode, count int, n bool) { - if m == deliver.ReqRep { - rep(ipc, m) - } else if m == deliver.Shm { - shmReciever(ipc, count) - // shmSender2(ipc, count) - } - - if n { - nReciever(ipc, m, count) - } else { - oneReciever(ipc, m) - } -} - var ( - proc string - procCount int - mode string - ipc string - oneSendnRecv bool + server bool + proto string + ipc string - tmm string + count int ) const ( - act = "act" - pass = "pass" + push = "push" + pull = "pull" + pub = "pub" + sub = "sub" + req = "req" + rep = "rep" + send = "send" // shm send + recv = "recv" // shm recv + pair = "pair" ) func init() { - flag.StringVar(&proc, "p", "act", "proc as sender") - flag.IntVar(&procCount, "c", 1, "proc run count") + flag.BoolVar(&server, "s", false, "run as server") + flag.StringVar(&proto, "p", "", "run protocol e.g. push/pull req/rep send/recv(use shm)") + flag.StringVar(&ipc, "i", "", "ipc address") + flag.IntVar(&count, "c", 1, "run client count") - flag.StringVar(&mode, "m", "pushpull", "proc run mode pushpull or pubsub etc.") - - flag.StringVar(&ipc, "i", "ipc:///tmp/pic.ipc", "ipc label") - - flag.BoolVar(&oneSendnRecv, "n", true, "one send n recv") - - flag.StringVar(&tmm, "t", "server", "") } -func modeType(t string) deliver.Mode { +func deliverMode(m string) deliver.Mode { - if t == "pushpull" { + if m == push || m == pull { return deliver.PushPull - } else if t == "pubsub" { + } else if m == pub || m == sub { return deliver.PubSub - } else if t == "pair" { + } else if m == pair { return deliver.Pair - } else if t == "reqrep" { + } else if m == req || m == rep { return deliver.ReqRep - } else if t == "shm" { + } else if m == send || m == recv { return deliver.Shm } @@ -90,26 +61,31 @@ func main() { flag.Parse() - if tmm == "server" { - c, _ := deliver.NewServerWithTimeout(deliver.PushPull, ipc, 1000) - // c := deliver.NewServer(deliver.PushPull, ipc) - nSenderImpl(c, 0) - } else if tmm == "client" { - s, _ := deliver.NewServerWithTimeout(deliver.PushPull, ipc, 100) - - oneRecvImpl(s, 0) - + fnMap := map[string]func(context.Context, bool, string, int){ + push: profile.Push, + pull: profile.Pull, + req: profile.Req, + rep: profile.Rep, + send: profile.Shmsend, + recv: profile.Shmrecv, } - return - m := modeType(mode) - if m > deliver.ModeStart { - if proc == act { - senderMode(ipc, m, procCount, oneSendnRecv) - } else { - recvMode(ipc, m, procCount, oneSendnRecv) - } + + ctx, cancel := context.WithCancel(context.Background()) + + if fn, ok := fnMap[proto]; ok { + fn(ctx, server, ipc, count) + } else { + fmt.Println("NO THIS FUNC: ", proto) } + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) + <-c + + cancel() + + time.Sleep(3 * time.Second) + fmt.Fprintf(os.Stderr, "Usage: pushpull push|pull <URL> <ARG> ...\n") os.Exit(1) diff --git a/npush-1pull.go b/npush-1pull.go deleted file mode 100644 index 9d54439..0000000 --- a/npush-1pull.go +++ /dev/null @@ -1,101 +0,0 @@ -package main - -import ( - "demo/deliver" - "fmt" - "os" - "os/signal" - "time" - - "golang.org/x/sys/unix" -) - -func nSenderImpl(s deliver.Deliver, index int) { - var err error - - buf := make([]byte, dLen) - - for { - select { - case <-ctx.Done(): - return - default: - if err = s.Send(buf); err != nil { - - fmt.Printf("%d can't send message on push socket: %s\n", index, err.Error()) - } else { - - fmt.Printf("%d send msg length %d\n", index, len(buf)) - } - } - } - -} - -func nSender(url string, m deliver.Mode, count int, args ...interface{}) { - - var cs []deliver.Deliver - - for i := 0; i < count; i++ { - c := deliver.NewClient(m, url, args...) - cs = append(cs, c) - - go nSenderImpl(c, i) - } - - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) - <-c - cancel() - for _, v := range cs { - v.Close() - } - -} - -func oneRecvImpl(c deliver.Deliver, index int) { - - var msg []byte - var err error - - var t int64 - var elapse int64 - count := 0 - - for { - msg, err = c.Recv() - if err != nil { - fmt.Println("recv error : ", err) - } - if t == 0 { - t = time.Now().UnixNano() - } - elapse = time.Now().UnixNano() - t - - count++ - - if elapse > 1e9 { - fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", - index, count, len(msg), elapse) - elapse = 0 - count = 0 - t = 0 - } - - // time.Sleep(10 * time.Millisecond) - } -} - -func oneReciever(url string, m deliver.Mode) { - - s := deliver.NewServer(m, url) - - go oneRecvImpl(s, 0) - - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) - <-c - - cancel() - s.Close() -} diff --git a/profile/pull.go b/profile/pull.go new file mode 100644 index 0000000..0636e3f --- /dev/null +++ b/profile/pull.go @@ -0,0 +1,56 @@ +package profile + +import ( + "context" + "demo/deliver" + "fmt" + "time" +) + +func puller(ctx context.Context, d deliver.Deliver, index int) { + var msg []byte + var err error + + var t int64 + var elapse int64 + count := 0 + + for { + msg, err = d.Recv() + if err != nil { + // fmt.Println("recv error : ", err) + } + if t == 0 { + t = time.Now().UnixNano() + } + elapse = time.Now().UnixNano() - t + + count++ + + if elapse > 1e9 { + fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", + index, count, len(msg), elapse) + elapse = 0 + count = 0 + t = 0 + } + + // time.Sleep(10 * time.Millisecond) + } +} + +func Pull(ctx context.Context, server bool, ipc string, count int) { + if server { + d := deliver.NewServer(deliver.PushPull, ipc) + go puller(ctx, d, 0) + } else { + var cs []deliver.Deliver + + for i := 0; i < count; i++ { + c := deliver.NewClient(deliver.PushPull, ipc) + cs = append(cs, c) + + go puller(ctx, c, i) + } + } +} diff --git a/profile/push.go b/profile/push.go new file mode 100644 index 0000000..73738e2 --- /dev/null +++ b/profile/push.go @@ -0,0 +1,46 @@ +package profile + +import ( + "context" + "demo/deliver" + "fmt" +) + +const dLen = 32 * 1024 * 1024 + +func pusher(ctx context.Context, d deliver.Deliver, index int) { + var err error + + buf := make([]byte, dLen) + + for { + select { + case <-ctx.Done(): + return + default: + if err = d.Send(buf); err != nil { + + // fmt.Printf("%d can't send message on push socket: %s\n", index, err.Error()) + } else { + + fmt.Printf("%d send msg length %d\n", index, len(buf)) + } + } + } +} + +func Push(ctx context.Context, server bool, ipc string, count int) { + if server { + d := deliver.NewServer(deliver.PushPull, ipc) + go pusher(ctx, d, 0) + } else { + var cs []deliver.Deliver + + for i := 0; i < count; i++ { + c := deliver.NewClient(deliver.PushPull, ipc) + cs = append(cs, c) + + go pusher(ctx, c, i) + } + } +} diff --git a/profile/reqrep.go b/profile/reqrep.go new file mode 100644 index 0000000..d65de5f --- /dev/null +++ b/profile/reqrep.go @@ -0,0 +1,83 @@ +package profile + +import ( + "context" + "demo/deliver" + "fmt" + "time" +) + +func Req(ctx context.Context, server bool, url string, num int) { + p := deliver.NewClient(deliver.ReqRep, url) + var err error + + msg := `hello, give me your data` + + var t int64 + var elapse int64 + count := 0 + + for { + select { + case <-ctx.Done(): + return + default: + + if err = p.Send([]byte(msg)); err != nil { + + fmt.Printf("can't send message on push socket: %s\n", err.Error()) + } else { + } + + if buf, err := p.Recv(); err != nil { + fmt.Println("recv error: ", err) + } else { + if t == 0 { + t = time.Now().UnixNano() + } + elapse = time.Now().UnixNano() - t + + count++ + + if elapse > 1e9 { + fmt.Printf("NODE: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", + count, len(buf), elapse) + elapse = 0 + count = 0 + t = 0 + } + + } + // time.Sleep(10 * time.Millisecond) + } + + } + +} + +func Rep(ctx context.Context, server bool, url string, num int) { + c := deliver.NewServer(deliver.ReqRep, url) + + var msg []byte + var err error + + buf := make([]byte, dLen) + + for { + select { + case <-ctx.Done(): + return + default: + + msg, err = c.Recv() + if err != nil { + fmt.Println("recv error : ", err, " msg ", msg) + continue + } + + c.Send(buf) + // time.Sleep(10 * time.Millisecond) + } + + } +} diff --git a/profile/shmrecv.go b/profile/shmrecv.go new file mode 100644 index 0000000..7bbfe60 --- /dev/null +++ b/profile/shmrecv.go @@ -0,0 +1,137 @@ +package profile + +import ( + "context" + "demo/deliver" + "fmt" + "os" + "time" +) + +func shmrecver(ctx context.Context, c deliver.Deliver, index int, ch chan<- bool) { + + var msg []byte + var err error + + var t int64 + var elapse int64 + count := 0 + + for { + select { + case <-ctx.Done(): + return + default: + msg, err = c.Recv() + if err != nil { + fmt.Println("recv error : ", err) + return + } + if ch != nil { + ch <- true + } + + if t == 0 { + t = time.Now().UnixNano() + } + elapse = time.Now().UnixNano() - t + + count++ + + if elapse > 1e9 { + fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n", + index, count, string(msg), len(msg), elapse) + elapse = 0 + count = 0 + t = 0 + } + } + + // time.Sleep(10 * time.Millisecond) + } + +} + +func Shmrecv(ctx context.Context, server bool, ipc string, count int) { + blocks := 2 + + if server { + s, err := deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen) + for { + if err == nil { + break + } + time.Sleep(1 * time.Second) + s, err = deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen) + } + go shmrecver(ctx, s, 0, nil) + + } else { + // recvers(ctx, ipc, count, nil) + + chWaiter := make(chan bool, count) + cs := recvers(ctx, ipc, count, chWaiter) + + go func() { + waitCount := 0 + for { + select { + case <-ctx.Done(): + return + case <-chWaiter: + waitCount = 0 + default: + if waitCount > 200*3 { + for _, v := range cs { + v.Close() + } + cs = recvers(ctx, ipc, count, chWaiter) + fmt.Println("restart recievers") + waitCount = 0 + continue + } + time.Sleep(time.Millisecond * 5) + waitCount++ + } + } + }() + } +} + +func recvers(ctx context.Context, url string, count int, ch chan<- bool) []deliver.Deliver { + for { + file := "/dev/shm/" + url + exist, _ := pathExists(file) + if exist { + break + } + fmt.Println("wait for shm server start") + time.Sleep(time.Second) + } + var cs []deliver.Deliver + for i := 0; i < count; i++ { + c, err := deliver.NewClientWithError(deliver.Shm, url) + for { + if err == nil { + break + } + time.Sleep(1 * time.Second) + c, err = deliver.NewClientWithError(deliver.Shm, url) + fmt.Println(i, " client create failed : ", err) + } + cs = append(cs, c) + go shmrecver(ctx, c, i, ch) + } + return cs +} + +func pathExists(path string) (bool, error) { + _, err := os.Stat(path) + if err == nil { + return true, nil + } + if os.IsNotExist(err) { + return false, nil + } + return false, err +} diff --git a/profile/shmsend.go b/profile/shmsend.go new file mode 100644 index 0000000..61a3b87 --- /dev/null +++ b/profile/shmsend.go @@ -0,0 +1,65 @@ +package profile + +import ( + "context" + "demo/deliver" + "fmt" + "time" +) + +func shmsender(ctx context.Context, s deliver.Deliver, index int) { + var err error + + buf := make([]byte, dLen) + + copy(buf, []byte("hello, give you this")) + for { + + select { + case <-ctx.Done(): + s.Close() + fmt.Println("quit shm sender") + return + default: + if err = s.Send(buf); err != nil { + + fmt.Printf("can't send message on push socket: %s\n", err.Error()) + } else { + + fmt.Printf("send msg length %d\n", len(buf)) + } + } + + // time.Sleep(10 * time.Millisecond) + } +} + +func Shmsend(ctx context.Context, server bool, ipc string, count int) { + blocks := 2 + if server { + s, err := deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen) + for { + if err == nil { + break + } + time.Sleep(1 * time.Second) + s, err = deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen) + } + go shmsender(ctx, s, 0) + } else { + + var cs []deliver.Deliver + for i := 0; i < count; i++ { + c, err := deliver.NewClientWithError(deliver.Shm, ipc) + for { + if err == nil { + break + } + time.Sleep(1 * time.Second) + c, err = deliver.NewClientWithError(deliver.Shm, ipc) + } + cs = append(cs, c) + go shmsender(ctx, c, i) + } + } +} diff --git a/reqrep.go b/reqrep.go deleted file mode 100644 index b23fdf4..0000000 --- a/reqrep.go +++ /dev/null @@ -1,69 +0,0 @@ -package main - -import ( - "demo/deliver" - "fmt" - "time" -) - -func req(url string, m deliver.Mode) { - p := deliver.NewClient(m, url) - var err error - - msg := `hello, give me your data` - - var t int64 - var elapse int64 - count := 0 - - for { - - if err = p.Send([]byte(msg)); err != nil { - - fmt.Printf("can't send message on push socket: %s\n", err.Error()) - } else { - } - - if buf, err := p.Recv(); err != nil { - fmt.Println("recv error: ", err) - } else { - if t == 0 { - t = time.Now().UnixNano() - } - elapse = time.Now().UnixNano() - t - - count++ - - if elapse > 1e9 { - fmt.Printf("NODE: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", - count, len(buf), elapse) - elapse = 0 - count = 0 - t = 0 - } - - } - // time.Sleep(10 * time.Millisecond) - } - -} - -func rep(url string, m deliver.Mode) { - c := deliver.NewServer(m, url) - - var msg []byte - var err error - - buf := make([]byte, dLen) - - for { - msg, err = c.Recv() - if err != nil { - fmt.Println("recv error : ", err, " msg ", msg) - continue - } - - c.Send(buf) - // time.Sleep(10 * time.Millisecond) - } -} diff --git a/runShm.go b/runShm.go deleted file mode 100644 index 1f05617..0000000 --- a/runShm.go +++ /dev/null @@ -1,129 +0,0 @@ -package main - -import ( - "demo/deliver" - "fmt" - "os" - "os/signal" - "sync" - "time" - - "golang.org/x/sys/unix" -) - -func shmSenderImpl(s deliver.Deliver) { - var err error - - buf := make([]byte, dLen) - - copy(buf, []byte("hello, give you this")) - for { - - select { - case <-ctx.Done(): - return - default: - if err = s.Send(buf); err != nil { - - fmt.Printf("can't send message on push socket: %s\n", err.Error()) - } else { - - fmt.Printf("send msg length %d\n", len(buf)) - } - } - - // time.Sleep(10 * time.Millisecond) - } - -} - -func shmSender(url string, args ...interface{}) { - s, err := deliver.NewServerWithError(deliver.Shm, url, args...) - for { - if err == nil { - break - } - fmt.Println("create shm error : ", err) - time.Sleep(1 * time.Second) - s, err = deliver.NewServerWithError(deliver.Shm, url, args) - } - go shmSenderImpl(s) - - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, os.Kill, unix.SIGINT) - <-c - - cancel() - s.Close() -} - -func shmRecvImpl(wg *sync.WaitGroup, c deliver.Deliver, url string, index int) { - - var msg []byte - var err error - - var t int64 - var elapse int64 - count := 0 - - for { - select { - case <-ctx.Done(): - wg.Done() - return - default: - msg, err = c.Recv() - if err != nil { - fmt.Println("recv error : ", err) - } - if t == 0 { - t = time.Now().UnixNano() - } - elapse = time.Now().UnixNano() - t - - count++ - - if elapse > 1e9 { - fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n", - index, count, string(msg), len(msg), elapse) - elapse = 0 - count = 0 - t = 0 - } - } - - // time.Sleep(10 * time.Millisecond) - } - -} - -func shmReciever(url string, count int) { - - wg := sync.WaitGroup{} - - var cs []deliver.Deliver - for i := 0; i < count; i++ { - wg.Add(1) - c, err := deliver.NewClientWithError(deliver.Shm, url) - for { - if err == nil { - break - } - time.Sleep(1 * time.Second) - c, err = deliver.NewClientWithError(deliver.Shm, url) - fmt.Println(i, " client create failed : ", err) - } - cs = append(cs, c) - go shmRecvImpl(&wg, c, url, i) - } - - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) - <-c - - cancel() - wg.Wait() - for _, v := range cs { - v.Close() - } -} diff --git a/runShm2.go b/runShm2.go deleted file mode 100644 index 34ea9df..0000000 --- a/runShm2.go +++ /dev/null @@ -1,122 +0,0 @@ -package main - -import ( - "demo/deliver" - "fmt" - "os" - "os/signal" - "time" - - "golang.org/x/sys/unix" -) - -func shmSenderImpl2(s deliver.Deliver) { - var err error - - buf := make([]byte, dLen) - - copy(buf, []byte("hello, give you this")) - for { - - select { - case <-ctx.Done(): - return - default: - if err = s.Send(buf); err != nil { - - fmt.Printf("can't send message on push socket: %s\n", err.Error()) - } else { - - fmt.Printf("send msg length %d\n", len(buf)) - } - } - - // time.Sleep(10 * time.Millisecond) - } - -} - -func shmReciever2(url string, count int, args ...interface{}) { - s, err := deliver.NewServerWithError(deliver.Shm, url, args...) - for { - if err == nil { - break - } - time.Sleep(1 * time.Second) - s, err = deliver.NewServerWithError(deliver.Shm, url, args...) - } - - go shmRecvImpl2(s, 0) - - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, os.Kill, unix.SIGINT) - <-c - - cancel() - s.Close() -} - -func shmRecvImpl2(c deliver.Deliver, index int) { - - var msg []byte - var err error - - var t int64 - var elapse int64 - count := 0 - - for { - select { - case <-ctx.Done(): - return - default: - msg, err = c.Recv() - if err != nil { - fmt.Println("recv error : ", err) - } - if t == 0 { - t = time.Now().UnixNano() - } - elapse = time.Now().UnixNano() - t - - count++ - - if elapse > 1e9 { - fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n", - index, count, string(msg), len(msg), elapse) - elapse = 0 - count = 0 - t = 0 - } - } - - // time.Sleep(10 * time.Millisecond) - } - -} - -func shmSender2(url string, count int) { - - var cs []deliver.Deliver - for i := 0; i < count; i++ { - c, err := deliver.NewClientWithError(deliver.Shm, url) - for { - if err == nil { - break - } - time.Sleep(1 * time.Second) - c, err = deliver.NewClientWithError(deliver.Shm, url) - } - cs = append(cs, c) - go shmSenderImpl2(c) - } - - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) - <-c - - cancel() - for _, v := range cs { - v.Close() - } -} diff --git a/shm b/shm new file mode 160000 index 0000000..a0123f1 --- /dev/null +++ b/shm @@ -1 +1 @@ -Subproject commit 0000000000000000000000000000000000000000 +Subproject commit a0123f163eddcea3e6b9f9d36f1f3fb3aa2c835a -- Gitblit v1.8.0