From cb4a30b9c26317a94e0be16029c8107d830fcd0d Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 26 八月 2019 13:46:48 +0800
Subject: [PATCH] update
---
/dev/null | 122 -----------
go.sum | 14
.gitmodules | 3
profile/pull.go | 56 +++++
profile/shmrecv.go | 137 ++++++++++++
go.mod | 7
profile/push.go | 46 ++++
profile/reqrep.go | 83 +++++++
shm | 2
main.go | 124 ++++------
profile/shmsend.go | 65 +++++
11 files changed, 447 insertions(+), 212 deletions(-)
diff --git a/.gitmodules b/.gitmodules
index 0dc6d55..9a70181 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,3 +1,6 @@
[submodule "deliver"]
path = deliver
url = ssh://zhangmeng@192.168.1.14:29418/valib/deliver.git
+[submodule "shm"]
+ path = shm
+ url = ssh://zhangmeng@192.168.1.14:29418/valib/shm.git
diff --git a/1push-npull.go b/1push-npull.go
deleted file mode 100644
index 31ea21a..0000000
--- a/1push-npull.go
+++ /dev/null
@@ -1,106 +0,0 @@
-package main
-
-import (
- "demo/deliver"
- "fmt"
- "os"
- "os/signal"
- "time"
-
- "golang.org/x/sys/unix"
-)
-
-func oneSenderImpl(s deliver.Deliver) {
- var err error
-
- buf := make([]byte, dLen)
-
- for {
- select {
- case <-ctx.Done():
- return
- default:
-
- if err = s.Send(buf); err != nil {
-
- fmt.Printf("can't send message on push socket: %s\n", err.Error())
- } else {
-
- fmt.Printf("send msg length %d\n", len(buf))
- }
- }
- }
-
-}
-
-func oneSender(url string, m deliver.Mode, args ...interface{}) {
- s := deliver.NewServer(m, url, args...)
-
- go oneSenderImpl(s)
-
- c := make(chan os.Signal, 1)
- signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
- <-c
-
- cancel()
- s.Close()
-}
-
-func nRecvImpl(c deliver.Deliver, index int) {
-
- var msg []byte
- var err error
-
- var t int64
- var elapse int64
- count := 0
-
- for {
- select {
- case <-ctx.Done():
- return
- default:
- msg, err = c.Recv()
- if err != nil {
- fmt.Println("recv error : ", err)
- }
- if t == 0 {
- t = time.Now().UnixNano()
- }
- elapse = time.Now().UnixNano() - t
-
- count++
-
- if elapse > 1e9 {
- fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
- index, count, len(msg), elapse)
- elapse = 0
- count = 0
- t = 0
- }
- }
-
- }
-}
-
-func nReciever(url string, m deliver.Mode, count int) {
-
- var cs []deliver.Deliver
-
- for i := 0; i < count; i++ {
- c := deliver.NewClient(m, url)
- cs = append(cs, c)
-
- go nRecvImpl(c, i)
- }
-
- c := make(chan os.Signal, 1)
- signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
- <-c
-
- cancel()
- for _, v := range cs {
- v.Close()
- }
-
-}
diff --git a/go.mod b/go.mod
index f9b1263..77b954b 100644
--- a/go.mod
+++ b/go.mod
@@ -3,10 +3,7 @@
go 1.12
require (
- github.com/gorilla/websocket v1.4.0 // indirect
- github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 // indirect
- github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 // indirect
- github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290
- golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872
+ github.com/gorilla/websocket v1.4.1 // indirect
+ golang.org/x/sys v0.0.0-20190825160603-fb81701db80f
nanomsg.org/go-mangos v1.4.0
)
diff --git a/go.sum b/go.sum
index 6fb9f50..fd43cdd 100644
--- a/go.sum
+++ b/go.sum
@@ -1,12 +1,6 @@
-github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
-github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
-github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 h1:n65+IT/xy5+trHm3Zpg9+j7IO4n8pBcPzvaKbMolW8U=
-github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877/go.mod h1:sgTk9wg3WurMlziuB3hcfgHYTz3pEkjQpSCTT8V2pW8=
-github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 h1:uVRQSWD6TOlWlLJ7IYYmbjRr0Xg35ADFN89HGQLPFGI=
-github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9/go.mod h1:vy1jksyhzuQOMkHXMEi+X2bZ47ZeCn3QTnYdFBesABs=
-github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290 h1:5zW+TRr0WH4uN72/E/XYwb1PcaYN5BIB/FUbcQ0nHr0=
-github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290/go.mod h1:e9PZQr6zVezMTwj1v0j1YhGCNdS2zTCjXU9q9K+HHGk=
-golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872 h1:cGjJzUd8RgBw428LXP65YXni0aiGNA4Bl+ls8SmLOm8=
-golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
+github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+golang.org/x/sys v0.0.0-20190825160603-fb81701db80f h1:LCxigP8q3fPRGNVYndYsyHnF0zRrvcoVwZMfb8iQZe4=
+golang.org/x/sys v0.0.0-20190825160603-fb81701db80f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
nanomsg.org/go-mangos v1.4.0 h1:pVRLnzXePdSbhWlWdSncYszTagERhMG5zK/vXYmbEdM=
nanomsg.org/go-mangos v1.4.0/go.mod h1:MOor8xUIgwsRMPpLr9xQxe7bT7rciibScOqVyztNxHQ=
diff --git a/main.go b/main.go
index fb8204a..c46e870 100644
--- a/main.go
+++ b/main.go
@@ -3,84 +3,55 @@
import (
"context"
"demo/deliver"
+ "demo/profile"
"flag"
"fmt"
"os"
+ "os/signal"
+ "time"
+
+ "golang.org/x/sys/unix"
)
-const dLen = 12 * 1024 * 1024
-
-var ctx, cancel = context.WithCancel(context.Background())
-
-func senderMode(ipc string, m deliver.Mode, count int, one bool) {
- if m == deliver.ReqRep {
- req(ipc, m)
- } else if m == deliver.Shm {
- shmSender(ipc, 2, 32*1024*1024)
- // shmReciever2(ipc, count, 2, 32*1024*1024)
- }
-
- if one {
- oneSender(ipc, m)
- } else {
- nSender(ipc, m, count)
- }
-}
-
-func recvMode(ipc string, m deliver.Mode, count int, n bool) {
- if m == deliver.ReqRep {
- rep(ipc, m)
- } else if m == deliver.Shm {
- shmReciever(ipc, count)
- // shmSender2(ipc, count)
- }
-
- if n {
- nReciever(ipc, m, count)
- } else {
- oneReciever(ipc, m)
- }
-}
-
var (
- proc string
- procCount int
- mode string
- ipc string
- oneSendnRecv bool
+ server bool
+ proto string
+ ipc string
- tmm string
+ count int
)
const (
- act = "act"
- pass = "pass"
+ push = "push"
+ pull = "pull"
+ pub = "pub"
+ sub = "sub"
+ req = "req"
+ rep = "rep"
+ send = "send" // shm send
+ recv = "recv" // shm recv
+ pair = "pair"
)
func init() {
- flag.StringVar(&proc, "p", "act", "proc as sender")
- flag.IntVar(&procCount, "c", 1, "proc run count")
+ flag.BoolVar(&server, "s", false, "run as server")
+ flag.StringVar(&proto, "p", "", "run protocol e.g. push/pull req/rep send/recv(use shm)")
+ flag.StringVar(&ipc, "i", "", "ipc address")
+ flag.IntVar(&count, "c", 1, "run client count")
- flag.StringVar(&mode, "m", "pushpull", "proc run mode pushpull or pubsub etc.")
-
- flag.StringVar(&ipc, "i", "ipc:///tmp/pic.ipc", "ipc label")
-
- flag.BoolVar(&oneSendnRecv, "n", true, "one send n recv")
-
- flag.StringVar(&tmm, "t", "server", "")
}
-func modeType(t string) deliver.Mode {
+func deliverMode(m string) deliver.Mode {
- if t == "pushpull" {
+ if m == push || m == pull {
return deliver.PushPull
- } else if t == "pubsub" {
+ } else if m == pub || m == sub {
return deliver.PubSub
- } else if t == "pair" {
+ } else if m == pair {
return deliver.Pair
- } else if t == "reqrep" {
+ } else if m == req || m == rep {
return deliver.ReqRep
- } else if t == "shm" {
+ } else if m == send || m == recv {
return deliver.Shm
}
@@ -90,26 +61,31 @@
func main() {
flag.Parse()
- if tmm == "server" {
- c, _ := deliver.NewServerWithTimeout(deliver.PushPull, ipc, 1000)
- // c := deliver.NewServer(deliver.PushPull, ipc)
- nSenderImpl(c, 0)
- } else if tmm == "client" {
- s, _ := deliver.NewServerWithTimeout(deliver.PushPull, ipc, 100)
-
- oneRecvImpl(s, 0)
-
+ fnMap := map[string]func(context.Context, bool, string, int){
+ push: profile.Push,
+ pull: profile.Pull,
+ req: profile.Req,
+ rep: profile.Rep,
+ send: profile.Shmsend,
+ recv: profile.Shmrecv,
}
- return
- m := modeType(mode)
- if m > deliver.ModeStart {
- if proc == act {
- senderMode(ipc, m, procCount, oneSendnRecv)
- } else {
- recvMode(ipc, m, procCount, oneSendnRecv)
- }
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ if fn, ok := fnMap[proto]; ok {
+ fn(ctx, server, ipc, count)
+ } else {
+ fmt.Println("NO THIS FUNC: ", proto)
}
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
+ <-c
+
+ cancel()
+
+ time.Sleep(3 * time.Second)
+
fmt.Fprintf(os.Stderr,
"Usage: pushpull push|pull <URL> <ARG> ...\n")
os.Exit(1)
diff --git a/npush-1pull.go b/npush-1pull.go
deleted file mode 100644
index 9d54439..0000000
--- a/npush-1pull.go
+++ /dev/null
@@ -1,101 +0,0 @@
-package main
-
-import (
- "demo/deliver"
- "fmt"
- "os"
- "os/signal"
- "time"
-
- "golang.org/x/sys/unix"
-)
-
-func nSenderImpl(s deliver.Deliver, index int) {
- var err error
-
- buf := make([]byte, dLen)
-
- for {
- select {
- case <-ctx.Done():
- return
- default:
- if err = s.Send(buf); err != nil {
-
- fmt.Printf("%d can't send message on push socket: %s\n", index, err.Error())
- } else {
-
- fmt.Printf("%d send msg length %d\n", index, len(buf))
- }
- }
- }
-
-}
-
-func nSender(url string, m deliver.Mode, count int, args ...interface{}) {
-
- var cs []deliver.Deliver
-
- for i := 0; i < count; i++ {
- c := deliver.NewClient(m, url, args...)
- cs = append(cs, c)
-
- go nSenderImpl(c, i)
- }
-
- c := make(chan os.Signal, 1)
- signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
- <-c
- cancel()
- for _, v := range cs {
- v.Close()
- }
-
-}
-
-func oneRecvImpl(c deliver.Deliver, index int) {
-
- var msg []byte
- var err error
-
- var t int64
- var elapse int64
- count := 0
-
- for {
- msg, err = c.Recv()
- if err != nil {
- fmt.Println("recv error : ", err)
- }
- if t == 0 {
- t = time.Now().UnixNano()
- }
- elapse = time.Now().UnixNano() - t
-
- count++
-
- if elapse > 1e9 {
- fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
- index, count, len(msg), elapse)
- elapse = 0
- count = 0
- t = 0
- }
-
- // time.Sleep(10 * time.Millisecond)
- }
-}
-
-func oneReciever(url string, m deliver.Mode) {
-
- s := deliver.NewServer(m, url)
-
- go oneRecvImpl(s, 0)
-
- c := make(chan os.Signal, 1)
- signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
- <-c
-
- cancel()
- s.Close()
-}
diff --git a/profile/pull.go b/profile/pull.go
new file mode 100644
index 0000000..0636e3f
--- /dev/null
+++ b/profile/pull.go
@@ -0,0 +1,56 @@
+package profile
+
+import (
+ "context"
+ "demo/deliver"
+ "fmt"
+ "time"
+)
+
+func puller(ctx context.Context, d deliver.Deliver, index int) {
+ var msg []byte
+ var err error
+
+ var t int64
+ var elapse int64
+ count := 0
+
+ for {
+ msg, err = d.Recv()
+ if err != nil {
+ // fmt.Println("recv error : ", err)
+ }
+ if t == 0 {
+ t = time.Now().UnixNano()
+ }
+ elapse = time.Now().UnixNano() - t
+
+ count++
+
+ if elapse > 1e9 {
+ fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
+ index, count, len(msg), elapse)
+ elapse = 0
+ count = 0
+ t = 0
+ }
+
+ // time.Sleep(10 * time.Millisecond)
+ }
+}
+
+func Pull(ctx context.Context, server bool, ipc string, count int) {
+ if server {
+ d := deliver.NewServer(deliver.PushPull, ipc)
+ go puller(ctx, d, 0)
+ } else {
+ var cs []deliver.Deliver
+
+ for i := 0; i < count; i++ {
+ c := deliver.NewClient(deliver.PushPull, ipc)
+ cs = append(cs, c)
+
+ go puller(ctx, c, i)
+ }
+ }
+}
diff --git a/profile/push.go b/profile/push.go
new file mode 100644
index 0000000..73738e2
--- /dev/null
+++ b/profile/push.go
@@ -0,0 +1,46 @@
+package profile
+
+import (
+ "context"
+ "demo/deliver"
+ "fmt"
+)
+
+const dLen = 32 * 1024 * 1024
+
+func pusher(ctx context.Context, d deliver.Deliver, index int) {
+ var err error
+
+ buf := make([]byte, dLen)
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ if err = d.Send(buf); err != nil {
+
+ // fmt.Printf("%d can't send message on push socket: %s\n", index, err.Error())
+ } else {
+
+ fmt.Printf("%d send msg length %d\n", index, len(buf))
+ }
+ }
+ }
+}
+
+func Push(ctx context.Context, server bool, ipc string, count int) {
+ if server {
+ d := deliver.NewServer(deliver.PushPull, ipc)
+ go pusher(ctx, d, 0)
+ } else {
+ var cs []deliver.Deliver
+
+ for i := 0; i < count; i++ {
+ c := deliver.NewClient(deliver.PushPull, ipc)
+ cs = append(cs, c)
+
+ go pusher(ctx, c, i)
+ }
+ }
+}
diff --git a/profile/reqrep.go b/profile/reqrep.go
new file mode 100644
index 0000000..d65de5f
--- /dev/null
+++ b/profile/reqrep.go
@@ -0,0 +1,83 @@
+package profile
+
+import (
+ "context"
+ "demo/deliver"
+ "fmt"
+ "time"
+)
+
+func Req(ctx context.Context, server bool, url string, num int) {
+ p := deliver.NewClient(deliver.ReqRep, url)
+ var err error
+
+ msg := `hello, give me your data`
+
+ var t int64
+ var elapse int64
+ count := 0
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+
+ if err = p.Send([]byte(msg)); err != nil {
+
+ fmt.Printf("can't send message on push socket: %s\n", err.Error())
+ } else {
+ }
+
+ if buf, err := p.Recv(); err != nil {
+ fmt.Println("recv error: ", err)
+ } else {
+ if t == 0 {
+ t = time.Now().UnixNano()
+ }
+ elapse = time.Now().UnixNano() - t
+
+ count++
+
+ if elapse > 1e9 {
+ fmt.Printf("NODE: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
+ count, len(buf), elapse)
+ elapse = 0
+ count = 0
+ t = 0
+ }
+
+ }
+ // time.Sleep(10 * time.Millisecond)
+ }
+
+ }
+
+}
+
+func Rep(ctx context.Context, server bool, url string, num int) {
+ c := deliver.NewServer(deliver.ReqRep, url)
+
+ var msg []byte
+ var err error
+
+ buf := make([]byte, dLen)
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+
+ msg, err = c.Recv()
+ if err != nil {
+ fmt.Println("recv error : ", err, " msg ", msg)
+ continue
+ }
+
+ c.Send(buf)
+ // time.Sleep(10 * time.Millisecond)
+ }
+
+ }
+}
diff --git a/profile/shmrecv.go b/profile/shmrecv.go
new file mode 100644
index 0000000..7bbfe60
--- /dev/null
+++ b/profile/shmrecv.go
@@ -0,0 +1,137 @@
+package profile
+
+import (
+ "context"
+ "demo/deliver"
+ "fmt"
+ "os"
+ "time"
+)
+
+func shmrecver(ctx context.Context, c deliver.Deliver, index int, ch chan<- bool) {
+
+ var msg []byte
+ var err error
+
+ var t int64
+ var elapse int64
+ count := 0
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ msg, err = c.Recv()
+ if err != nil {
+ fmt.Println("recv error : ", err)
+ return
+ }
+ if ch != nil {
+ ch <- true
+ }
+
+ if t == 0 {
+ t = time.Now().UnixNano()
+ }
+ elapse = time.Now().UnixNano() - t
+
+ count++
+
+ if elapse > 1e9 {
+ fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n",
+ index, count, string(msg), len(msg), elapse)
+ elapse = 0
+ count = 0
+ t = 0
+ }
+ }
+
+ // time.Sleep(10 * time.Millisecond)
+ }
+
+}
+
+func Shmrecv(ctx context.Context, server bool, ipc string, count int) {
+ blocks := 2
+
+ if server {
+ s, err := deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen)
+ for {
+ if err == nil {
+ break
+ }
+ time.Sleep(1 * time.Second)
+ s, err = deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen)
+ }
+ go shmrecver(ctx, s, 0, nil)
+
+ } else {
+ // recvers(ctx, ipc, count, nil)
+
+ chWaiter := make(chan bool, count)
+ cs := recvers(ctx, ipc, count, chWaiter)
+
+ go func() {
+ waitCount := 0
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-chWaiter:
+ waitCount = 0
+ default:
+ if waitCount > 200*3 {
+ for _, v := range cs {
+ v.Close()
+ }
+ cs = recvers(ctx, ipc, count, chWaiter)
+ fmt.Println("restart recievers")
+ waitCount = 0
+ continue
+ }
+ time.Sleep(time.Millisecond * 5)
+ waitCount++
+ }
+ }
+ }()
+ }
+}
+
+func recvers(ctx context.Context, url string, count int, ch chan<- bool) []deliver.Deliver {
+ for {
+ file := "/dev/shm/" + url
+ exist, _ := pathExists(file)
+ if exist {
+ break
+ }
+ fmt.Println("wait for shm server start")
+ time.Sleep(time.Second)
+ }
+ var cs []deliver.Deliver
+ for i := 0; i < count; i++ {
+ c, err := deliver.NewClientWithError(deliver.Shm, url)
+ for {
+ if err == nil {
+ break
+ }
+ time.Sleep(1 * time.Second)
+ c, err = deliver.NewClientWithError(deliver.Shm, url)
+ fmt.Println(i, " client create failed : ", err)
+ }
+ cs = append(cs, c)
+ go shmrecver(ctx, c, i, ch)
+ }
+ return cs
+}
+
+func pathExists(path string) (bool, error) {
+ _, err := os.Stat(path)
+ if err == nil {
+ return true, nil
+ }
+ if os.IsNotExist(err) {
+ return false, nil
+ }
+ return false, err
+}
diff --git a/profile/shmsend.go b/profile/shmsend.go
new file mode 100644
index 0000000..61a3b87
--- /dev/null
+++ b/profile/shmsend.go
@@ -0,0 +1,65 @@
+package profile
+
+import (
+ "context"
+ "demo/deliver"
+ "fmt"
+ "time"
+)
+
+func shmsender(ctx context.Context, s deliver.Deliver, index int) {
+ var err error
+
+ buf := make([]byte, dLen)
+
+ copy(buf, []byte("hello, give you this"))
+ for {
+
+ select {
+ case <-ctx.Done():
+ s.Close()
+ fmt.Println("quit shm sender")
+ return
+ default:
+ if err = s.Send(buf); err != nil {
+
+ fmt.Printf("can't send message on push socket: %s\n", err.Error())
+ } else {
+
+ fmt.Printf("send msg length %d\n", len(buf))
+ }
+ }
+
+ // time.Sleep(10 * time.Millisecond)
+ }
+}
+
+func Shmsend(ctx context.Context, server bool, ipc string, count int) {
+ blocks := 2
+ if server {
+ s, err := deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen)
+ for {
+ if err == nil {
+ break
+ }
+ time.Sleep(1 * time.Second)
+ s, err = deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen)
+ }
+ go shmsender(ctx, s, 0)
+ } else {
+
+ var cs []deliver.Deliver
+ for i := 0; i < count; i++ {
+ c, err := deliver.NewClientWithError(deliver.Shm, ipc)
+ for {
+ if err == nil {
+ break
+ }
+ time.Sleep(1 * time.Second)
+ c, err = deliver.NewClientWithError(deliver.Shm, ipc)
+ }
+ cs = append(cs, c)
+ go shmsender(ctx, c, i)
+ }
+ }
+}
diff --git a/reqrep.go b/reqrep.go
deleted file mode 100644
index b23fdf4..0000000
--- a/reqrep.go
+++ /dev/null
@@ -1,69 +0,0 @@
-package main
-
-import (
- "demo/deliver"
- "fmt"
- "time"
-)
-
-func req(url string, m deliver.Mode) {
- p := deliver.NewClient(m, url)
- var err error
-
- msg := `hello, give me your data`
-
- var t int64
- var elapse int64
- count := 0
-
- for {
-
- if err = p.Send([]byte(msg)); err != nil {
-
- fmt.Printf("can't send message on push socket: %s\n", err.Error())
- } else {
- }
-
- if buf, err := p.Recv(); err != nil {
- fmt.Println("recv error: ", err)
- } else {
- if t == 0 {
- t = time.Now().UnixNano()
- }
- elapse = time.Now().UnixNano() - t
-
- count++
-
- if elapse > 1e9 {
- fmt.Printf("NODE: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
- count, len(buf), elapse)
- elapse = 0
- count = 0
- t = 0
- }
-
- }
- // time.Sleep(10 * time.Millisecond)
- }
-
-}
-
-func rep(url string, m deliver.Mode) {
- c := deliver.NewServer(m, url)
-
- var msg []byte
- var err error
-
- buf := make([]byte, dLen)
-
- for {
- msg, err = c.Recv()
- if err != nil {
- fmt.Println("recv error : ", err, " msg ", msg)
- continue
- }
-
- c.Send(buf)
- // time.Sleep(10 * time.Millisecond)
- }
-}
diff --git a/runShm.go b/runShm.go
deleted file mode 100644
index 1f05617..0000000
--- a/runShm.go
+++ /dev/null
@@ -1,129 +0,0 @@
-package main
-
-import (
- "demo/deliver"
- "fmt"
- "os"
- "os/signal"
- "sync"
- "time"
-
- "golang.org/x/sys/unix"
-)
-
-func shmSenderImpl(s deliver.Deliver) {
- var err error
-
- buf := make([]byte, dLen)
-
- copy(buf, []byte("hello, give you this"))
- for {
-
- select {
- case <-ctx.Done():
- return
- default:
- if err = s.Send(buf); err != nil {
-
- fmt.Printf("can't send message on push socket: %s\n", err.Error())
- } else {
-
- fmt.Printf("send msg length %d\n", len(buf))
- }
- }
-
- // time.Sleep(10 * time.Millisecond)
- }
-
-}
-
-func shmSender(url string, args ...interface{}) {
- s, err := deliver.NewServerWithError(deliver.Shm, url, args...)
- for {
- if err == nil {
- break
- }
- fmt.Println("create shm error : ", err)
- time.Sleep(1 * time.Second)
- s, err = deliver.NewServerWithError(deliver.Shm, url, args)
- }
- go shmSenderImpl(s)
-
- c := make(chan os.Signal, 1)
- signal.Notify(c, os.Interrupt, os.Kill, unix.SIGINT)
- <-c
-
- cancel()
- s.Close()
-}
-
-func shmRecvImpl(wg *sync.WaitGroup, c deliver.Deliver, url string, index int) {
-
- var msg []byte
- var err error
-
- var t int64
- var elapse int64
- count := 0
-
- for {
- select {
- case <-ctx.Done():
- wg.Done()
- return
- default:
- msg, err = c.Recv()
- if err != nil {
- fmt.Println("recv error : ", err)
- }
- if t == 0 {
- t = time.Now().UnixNano()
- }
- elapse = time.Now().UnixNano() - t
-
- count++
-
- if elapse > 1e9 {
- fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n",
- index, count, string(msg), len(msg), elapse)
- elapse = 0
- count = 0
- t = 0
- }
- }
-
- // time.Sleep(10 * time.Millisecond)
- }
-
-}
-
-func shmReciever(url string, count int) {
-
- wg := sync.WaitGroup{}
-
- var cs []deliver.Deliver
- for i := 0; i < count; i++ {
- wg.Add(1)
- c, err := deliver.NewClientWithError(deliver.Shm, url)
- for {
- if err == nil {
- break
- }
- time.Sleep(1 * time.Second)
- c, err = deliver.NewClientWithError(deliver.Shm, url)
- fmt.Println(i, " client create failed : ", err)
- }
- cs = append(cs, c)
- go shmRecvImpl(&wg, c, url, i)
- }
-
- c := make(chan os.Signal, 1)
- signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
- <-c
-
- cancel()
- wg.Wait()
- for _, v := range cs {
- v.Close()
- }
-}
diff --git a/runShm2.go b/runShm2.go
deleted file mode 100644
index 34ea9df..0000000
--- a/runShm2.go
+++ /dev/null
@@ -1,122 +0,0 @@
-package main
-
-import (
- "demo/deliver"
- "fmt"
- "os"
- "os/signal"
- "time"
-
- "golang.org/x/sys/unix"
-)
-
-func shmSenderImpl2(s deliver.Deliver) {
- var err error
-
- buf := make([]byte, dLen)
-
- copy(buf, []byte("hello, give you this"))
- for {
-
- select {
- case <-ctx.Done():
- return
- default:
- if err = s.Send(buf); err != nil {
-
- fmt.Printf("can't send message on push socket: %s\n", err.Error())
- } else {
-
- fmt.Printf("send msg length %d\n", len(buf))
- }
- }
-
- // time.Sleep(10 * time.Millisecond)
- }
-
-}
-
-func shmReciever2(url string, count int, args ...interface{}) {
- s, err := deliver.NewServerWithError(deliver.Shm, url, args...)
- for {
- if err == nil {
- break
- }
- time.Sleep(1 * time.Second)
- s, err = deliver.NewServerWithError(deliver.Shm, url, args...)
- }
-
- go shmRecvImpl2(s, 0)
-
- c := make(chan os.Signal, 1)
- signal.Notify(c, os.Interrupt, os.Kill, unix.SIGINT)
- <-c
-
- cancel()
- s.Close()
-}
-
-func shmRecvImpl2(c deliver.Deliver, index int) {
-
- var msg []byte
- var err error
-
- var t int64
- var elapse int64
- count := 0
-
- for {
- select {
- case <-ctx.Done():
- return
- default:
- msg, err = c.Recv()
- if err != nil {
- fmt.Println("recv error : ", err)
- }
- if t == 0 {
- t = time.Now().UnixNano()
- }
- elapse = time.Now().UnixNano() - t
-
- count++
-
- if elapse > 1e9 {
- fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n",
- index, count, string(msg), len(msg), elapse)
- elapse = 0
- count = 0
- t = 0
- }
- }
-
- // time.Sleep(10 * time.Millisecond)
- }
-
-}
-
-func shmSender2(url string, count int) {
-
- var cs []deliver.Deliver
- for i := 0; i < count; i++ {
- c, err := deliver.NewClientWithError(deliver.Shm, url)
- for {
- if err == nil {
- break
- }
- time.Sleep(1 * time.Second)
- c, err = deliver.NewClientWithError(deliver.Shm, url)
- }
- cs = append(cs, c)
- go shmSenderImpl2(c)
- }
-
- c := make(chan os.Signal, 1)
- signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
- <-c
-
- cancel()
- for _, v := range cs {
- v.Close()
- }
-}
diff --git a/shm b/shm
new file mode 160000
index 0000000..a0123f1
--- /dev/null
+++ b/shm
@@ -1 +1 @@
-Subproject commit 0000000000000000000000000000000000000000
+Subproject commit a0123f163eddcea3e6b9f9d36f1f3fb3aa2c835a
--
Gitblit v1.8.0