From cb4a30b9c26317a94e0be16029c8107d830fcd0d Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 26 八月 2019 13:46:48 +0800
Subject: [PATCH] update

---
 /dev/null          |  122 -----------
 go.sum             |   14 
 .gitmodules        |    3 
 profile/pull.go    |   56 +++++
 profile/shmrecv.go |  137 ++++++++++++
 go.mod             |    7 
 profile/push.go    |   46 ++++
 profile/reqrep.go  |   83 +++++++
 shm                |    2 
 main.go            |  124 ++++------
 profile/shmsend.go |   65 +++++
 11 files changed, 447 insertions(+), 212 deletions(-)

diff --git a/.gitmodules b/.gitmodules
index 0dc6d55..9a70181 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,3 +1,6 @@
 [submodule "deliver"]
 	path = deliver
 	url = ssh://zhangmeng@192.168.1.14:29418/valib/deliver.git
+[submodule "shm"]
+	path = shm
+	url = ssh://zhangmeng@192.168.1.14:29418/valib/shm.git
diff --git a/1push-npull.go b/1push-npull.go
deleted file mode 100644
index 31ea21a..0000000
--- a/1push-npull.go
+++ /dev/null
@@ -1,106 +0,0 @@
-package main
-
-import (
-	"demo/deliver"
-	"fmt"
-	"os"
-	"os/signal"
-	"time"
-
-	"golang.org/x/sys/unix"
-)
-
-func oneSenderImpl(s deliver.Deliver) {
-	var err error
-
-	buf := make([]byte, dLen)
-
-	for {
-		select {
-		case <-ctx.Done():
-			return
-		default:
-
-			if err = s.Send(buf); err != nil {
-
-				fmt.Printf("can't send message on push socket: %s\n", err.Error())
-			} else {
-
-				fmt.Printf("send msg length %d\n", len(buf))
-			}
-		}
-	}
-
-}
-
-func oneSender(url string, m deliver.Mode, args ...interface{}) {
-	s := deliver.NewServer(m, url, args...)
-
-	go oneSenderImpl(s)
-
-	c := make(chan os.Signal, 1)
-	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
-	<-c
-
-	cancel()
-	s.Close()
-}
-
-func nRecvImpl(c deliver.Deliver, index int) {
-
-	var msg []byte
-	var err error
-
-	var t int64
-	var elapse int64
-	count := 0
-
-	for {
-		select {
-		case <-ctx.Done():
-			return
-		default:
-			msg, err = c.Recv()
-			if err != nil {
-				fmt.Println("recv error : ", err)
-			}
-			if t == 0 {
-				t = time.Now().UnixNano()
-			}
-			elapse = time.Now().UnixNano() - t
-
-			count++
-
-			if elapse > 1e9 {
-				fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
-					index, count, len(msg), elapse)
-				elapse = 0
-				count = 0
-				t = 0
-			}
-		}
-
-	}
-}
-
-func nReciever(url string, m deliver.Mode, count int) {
-
-	var cs []deliver.Deliver
-
-	for i := 0; i < count; i++ {
-		c := deliver.NewClient(m, url)
-		cs = append(cs, c)
-
-		go nRecvImpl(c, i)
-	}
-
-	c := make(chan os.Signal, 1)
-	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
-	<-c
-
-	cancel()
-	for _, v := range cs {
-		v.Close()
-	}
-
-}
diff --git a/go.mod b/go.mod
index f9b1263..77b954b 100644
--- a/go.mod
+++ b/go.mod
@@ -3,10 +3,7 @@
 go 1.12
 
 require (
-	github.com/gorilla/websocket v1.4.0 // indirect
-	github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 // indirect
-	github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 // indirect
-	github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290
-	golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872
+	github.com/gorilla/websocket v1.4.1 // indirect
+	golang.org/x/sys v0.0.0-20190825160603-fb81701db80f
 	nanomsg.org/go-mangos v1.4.0
 )
diff --git a/go.sum b/go.sum
index 6fb9f50..fd43cdd 100644
--- a/go.sum
+++ b/go.sum
@@ -1,12 +1,6 @@
-github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
-github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
-github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 h1:n65+IT/xy5+trHm3Zpg9+j7IO4n8pBcPzvaKbMolW8U=
-github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877/go.mod h1:sgTk9wg3WurMlziuB3hcfgHYTz3pEkjQpSCTT8V2pW8=
-github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 h1:uVRQSWD6TOlWlLJ7IYYmbjRr0Xg35ADFN89HGQLPFGI=
-github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9/go.mod h1:vy1jksyhzuQOMkHXMEi+X2bZ47ZeCn3QTnYdFBesABs=
-github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290 h1:5zW+TRr0WH4uN72/E/XYwb1PcaYN5BIB/FUbcQ0nHr0=
-github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290/go.mod h1:e9PZQr6zVezMTwj1v0j1YhGCNdS2zTCjXU9q9K+HHGk=
-golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872 h1:cGjJzUd8RgBw428LXP65YXni0aiGNA4Bl+ls8SmLOm8=
-golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
+github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+golang.org/x/sys v0.0.0-20190825160603-fb81701db80f h1:LCxigP8q3fPRGNVYndYsyHnF0zRrvcoVwZMfb8iQZe4=
+golang.org/x/sys v0.0.0-20190825160603-fb81701db80f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 nanomsg.org/go-mangos v1.4.0 h1:pVRLnzXePdSbhWlWdSncYszTagERhMG5zK/vXYmbEdM=
 nanomsg.org/go-mangos v1.4.0/go.mod h1:MOor8xUIgwsRMPpLr9xQxe7bT7rciibScOqVyztNxHQ=
diff --git a/main.go b/main.go
index fb8204a..c46e870 100644
--- a/main.go
+++ b/main.go
@@ -3,84 +3,55 @@
 import (
 	"context"
 	"demo/deliver"
+	"demo/profile"
 	"flag"
 	"fmt"
 	"os"
+	"os/signal"
+	"time"
+
+	"golang.org/x/sys/unix"
 )
 
-const dLen = 12 * 1024 * 1024
-
-var ctx, cancel = context.WithCancel(context.Background())
-
-func senderMode(ipc string, m deliver.Mode, count int, one bool) {
-	if m == deliver.ReqRep {
-		req(ipc, m)
-	} else if m == deliver.Shm {
-		shmSender(ipc, 2, 32*1024*1024)
-		// shmReciever2(ipc, count, 2, 32*1024*1024)
-	}
-
-	if one {
-		oneSender(ipc, m)
-	} else {
-		nSender(ipc, m, count)
-	}
-}
-
-func recvMode(ipc string, m deliver.Mode, count int, n bool) {
-	if m == deliver.ReqRep {
-		rep(ipc, m)
-	} else if m == deliver.Shm {
-		shmReciever(ipc, count)
-		// shmSender2(ipc, count)
-	}
-
-	if n {
-		nReciever(ipc, m, count)
-	} else {
-		oneReciever(ipc, m)
-	}
-}
-
 var (
-	proc         string
-	procCount    int
-	mode         string
-	ipc          string
-	oneSendnRecv bool
+	server bool
+	proto  string
+	ipc    string
 
-	tmm string
+	count int
 )
 
 const (
-	act  = "act"
-	pass = "pass"
+	push = "push"
+	pull = "pull"
+	pub  = "pub"
+	sub  = "sub"
+	req  = "req"
+	rep  = "rep"
+	send = "send" // shm send
+	recv = "recv" // shm recv
+	pair = "pair"
 )
 
 func init() {
-	flag.StringVar(&proc, "p", "act", "proc as sender")
-	flag.IntVar(&procCount, "c", 1, "proc run count")
+	flag.BoolVar(&server, "s", false, "run as server")
+	flag.StringVar(&proto, "p", "", "run protocol e.g. push/pull req/rep send/recv(use shm)")
+	flag.StringVar(&ipc, "i", "", "ipc address")
+	flag.IntVar(&count, "c", 1, "run client count")
 
-	flag.StringVar(&mode, "m", "pushpull", "proc run mode pushpull or pubsub etc.")
-
-	flag.StringVar(&ipc, "i", "ipc:///tmp/pic.ipc", "ipc label")
-
-	flag.BoolVar(&oneSendnRecv, "n", true, "one send n recv")
-
-	flag.StringVar(&tmm, "t", "server", "")
 }
 
-func modeType(t string) deliver.Mode {
+func deliverMode(m string) deliver.Mode {
 
-	if t == "pushpull" {
+	if m == push || m == pull {
 		return deliver.PushPull
-	} else if t == "pubsub" {
+	} else if m == pub || m == sub {
 		return deliver.PubSub
-	} else if t == "pair" {
+	} else if m == pair {
 		return deliver.Pair
-	} else if t == "reqrep" {
+	} else if m == req || m == rep {
 		return deliver.ReqRep
-	} else if t == "shm" {
+	} else if m == send || m == recv {
 		return deliver.Shm
 	}
 
@@ -90,26 +61,31 @@
 func main() {
 	flag.Parse()
 
-	if tmm == "server" {
-		c, _ := deliver.NewServerWithTimeout(deliver.PushPull, ipc, 1000)
-		// c := deliver.NewServer(deliver.PushPull, ipc)
-		nSenderImpl(c, 0)
-	} else if tmm == "client" {
-		s, _ := deliver.NewServerWithTimeout(deliver.PushPull, ipc, 100)
-
-		oneRecvImpl(s, 0)
-
+	fnMap := map[string]func(context.Context, bool, string, int){
+		push: profile.Push,
+		pull: profile.Pull,
+		req:  profile.Req,
+		rep:  profile.Rep,
+		send: profile.Shmsend,
+		recv: profile.Shmrecv,
 	}
-	return
-	m := modeType(mode)
-	if m > deliver.ModeStart {
-		if proc == act {
-			senderMode(ipc, m, procCount, oneSendnRecv)
-		} else {
-			recvMode(ipc, m, procCount, oneSendnRecv)
-		}
+
+	ctx, cancel := context.WithCancel(context.Background())
+
+	if fn, ok := fnMap[proto]; ok {
+		fn(ctx, server, ipc, count)
+	} else {
+		fmt.Println("NO THIS FUNC: ", proto)
 	}
 
+	c := make(chan os.Signal, 1)
+	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
+	<-c
+
+	cancel()
+
+	time.Sleep(3 * time.Second)
+
 	fmt.Fprintf(os.Stderr,
 		"Usage: pushpull push|pull <URL> <ARG> ...\n")
 	os.Exit(1)
diff --git a/npush-1pull.go b/npush-1pull.go
deleted file mode 100644
index 9d54439..0000000
--- a/npush-1pull.go
+++ /dev/null
@@ -1,101 +0,0 @@
-package main
-
-import (
-	"demo/deliver"
-	"fmt"
-	"os"
-	"os/signal"
-	"time"
-
-	"golang.org/x/sys/unix"
-)
-
-func nSenderImpl(s deliver.Deliver, index int) {
-	var err error
-
-	buf := make([]byte, dLen)
-
-	for {
-		select {
-		case <-ctx.Done():
-			return
-		default:
-			if err = s.Send(buf); err != nil {
-
-				fmt.Printf("%d can't send message on push socket: %s\n", index, err.Error())
-			} else {
-
-				fmt.Printf("%d send msg length %d\n", index, len(buf))
-			}
-		}
-	}
-
-}
-
-func nSender(url string, m deliver.Mode, count int, args ...interface{}) {
-
-	var cs []deliver.Deliver
-
-	for i := 0; i < count; i++ {
-		c := deliver.NewClient(m, url, args...)
-		cs = append(cs, c)
-
-		go nSenderImpl(c, i)
-	}
-
-	c := make(chan os.Signal, 1)
-	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
-	<-c
-	cancel()
-	for _, v := range cs {
-		v.Close()
-	}
-
-}
-
-func oneRecvImpl(c deliver.Deliver, index int) {
-
-	var msg []byte
-	var err error
-
-	var t int64
-	var elapse int64
-	count := 0
-
-	for {
-		msg, err = c.Recv()
-		if err != nil {
-			fmt.Println("recv error : ", err)
-		}
-		if t == 0 {
-			t = time.Now().UnixNano()
-		}
-		elapse = time.Now().UnixNano() - t
-
-		count++
-
-		if elapse > 1e9 {
-			fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
-				index, count, len(msg), elapse)
-			elapse = 0
-			count = 0
-			t = 0
-		}
-
-		// time.Sleep(10 * time.Millisecond)
-	}
-}
-
-func oneReciever(url string, m deliver.Mode) {
-
-	s := deliver.NewServer(m, url)
-
-	go oneRecvImpl(s, 0)
-
-	c := make(chan os.Signal, 1)
-	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
-	<-c
-
-	cancel()
-	s.Close()
-}
diff --git a/profile/pull.go b/profile/pull.go
new file mode 100644
index 0000000..0636e3f
--- /dev/null
+++ b/profile/pull.go
@@ -0,0 +1,56 @@
+package profile
+
+import (
+	"context"
+	"demo/deliver"
+	"fmt"
+	"time"
+)
+
+func puller(ctx context.Context, d deliver.Deliver, index int) {
+	var msg []byte
+	var err error
+
+	var t int64
+	var elapse int64
+	count := 0
+
+	for {
+		msg, err = d.Recv()
+		if err != nil {
+			// fmt.Println("recv error : ", err)
+		}
+		if t == 0 {
+			t = time.Now().UnixNano()
+		}
+		elapse = time.Now().UnixNano() - t
+
+		count++
+
+		if elapse > 1e9 {
+			fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
+				index, count, len(msg), elapse)
+			elapse = 0
+			count = 0
+			t = 0
+		}
+
+		// time.Sleep(10 * time.Millisecond)
+	}
+}
+
+func Pull(ctx context.Context, server bool, ipc string, count int) {
+	if server {
+		d := deliver.NewServer(deliver.PushPull, ipc)
+		go puller(ctx, d, 0)
+	} else {
+		var cs []deliver.Deliver
+
+		for i := 0; i < count; i++ {
+			c := deliver.NewClient(deliver.PushPull, ipc)
+			cs = append(cs, c)
+
+			go puller(ctx, c, i)
+		}
+	}
+}
diff --git a/profile/push.go b/profile/push.go
new file mode 100644
index 0000000..73738e2
--- /dev/null
+++ b/profile/push.go
@@ -0,0 +1,46 @@
+package profile
+
+import (
+	"context"
+	"demo/deliver"
+	"fmt"
+)
+
+const dLen = 32 * 1024 * 1024
+
+func pusher(ctx context.Context, d deliver.Deliver, index int) {
+	var err error
+
+	buf := make([]byte, dLen)
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			if err = d.Send(buf); err != nil {
+
+				// fmt.Printf("%d can't send message on push socket: %s\n", index, err.Error())
+			} else {
+
+				fmt.Printf("%d send msg length %d\n", index, len(buf))
+			}
+		}
+	}
+}
+
+func Push(ctx context.Context, server bool, ipc string, count int) {
+	if server {
+		d := deliver.NewServer(deliver.PushPull, ipc)
+		go pusher(ctx, d, 0)
+	} else {
+		var cs []deliver.Deliver
+
+		for i := 0; i < count; i++ {
+			c := deliver.NewClient(deliver.PushPull, ipc)
+			cs = append(cs, c)
+
+			go pusher(ctx, c, i)
+		}
+	}
+}
diff --git a/profile/reqrep.go b/profile/reqrep.go
new file mode 100644
index 0000000..d65de5f
--- /dev/null
+++ b/profile/reqrep.go
@@ -0,0 +1,83 @@
+package profile
+
+import (
+	"context"
+	"demo/deliver"
+	"fmt"
+	"time"
+)
+
+func Req(ctx context.Context, server bool, url string, num int) {
+	p := deliver.NewClient(deliver.ReqRep, url)
+	var err error
+
+	msg := `hello, give me your data`
+
+	var t int64
+	var elapse int64
+	count := 0
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+
+			if err = p.Send([]byte(msg)); err != nil {
+
+				fmt.Printf("can't send message on push socket: %s\n", err.Error())
+			} else {
+			}
+
+			if buf, err := p.Recv(); err != nil {
+				fmt.Println("recv error: ", err)
+			} else {
+				if t == 0 {
+					t = time.Now().UnixNano()
+				}
+				elapse = time.Now().UnixNano() - t
+
+				count++
+
+				if elapse > 1e9 {
+					fmt.Printf("NODE: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
+						count, len(buf), elapse)
+					elapse = 0
+					count = 0
+					t = 0
+				}
+
+			}
+			// time.Sleep(10 * time.Millisecond)
+		}
+
+	}
+
+}
+
+func Rep(ctx context.Context, server bool, url string, num int) {
+	c := deliver.NewServer(deliver.ReqRep, url)
+
+	var msg []byte
+	var err error
+
+	buf := make([]byte, dLen)
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+
+			msg, err = c.Recv()
+			if err != nil {
+				fmt.Println("recv error : ", err, " msg ", msg)
+				continue
+			}
+
+			c.Send(buf)
+			// time.Sleep(10 * time.Millisecond)
+		}
+
+	}
+}
diff --git a/profile/shmrecv.go b/profile/shmrecv.go
new file mode 100644
index 0000000..7bbfe60
--- /dev/null
+++ b/profile/shmrecv.go
@@ -0,0 +1,137 @@
+package profile
+
+import (
+	"context"
+	"demo/deliver"
+	"fmt"
+	"os"
+	"time"
+)
+
+func shmrecver(ctx context.Context, c deliver.Deliver, index int, ch chan<- bool) {
+
+	var msg []byte
+	var err error
+
+	var t int64
+	var elapse int64
+	count := 0
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			msg, err = c.Recv()
+			if err != nil {
+				fmt.Println("recv error : ", err)
+				return
+			}
+			if ch != nil {
+				ch <- true
+			}
+
+			if t == 0 {
+				t = time.Now().UnixNano()
+			}
+			elapse = time.Now().UnixNano() - t
+
+			count++
+
+			if elapse > 1e9 {
+				fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n",
+					index, count, string(msg), len(msg), elapse)
+				elapse = 0
+				count = 0
+				t = 0
+			}
+		}
+
+		// time.Sleep(10 * time.Millisecond)
+	}
+
+}
+
+func Shmrecv(ctx context.Context, server bool, ipc string, count int) {
+	blocks := 2
+
+	if server {
+		s, err := deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen)
+		for {
+			if err == nil {
+				break
+			}
+			time.Sleep(1 * time.Second)
+			s, err = deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen)
+		}
+		go shmrecver(ctx, s, 0, nil)
+
+	} else {
+		// recvers(ctx, ipc, count, nil)
+
+		chWaiter := make(chan bool, count)
+		cs := recvers(ctx, ipc, count, chWaiter)
+
+		go func() {
+			waitCount := 0
+			for {
+				select {
+				case <-ctx.Done():
+					return
+				case <-chWaiter:
+					waitCount = 0
+				default:
+					if waitCount > 200*3 {
+						for _, v := range cs {
+							v.Close()
+						}
+						cs = recvers(ctx, ipc, count, chWaiter)
+						fmt.Println("restart recievers")
+						waitCount = 0
+						continue
+					}
+					time.Sleep(time.Millisecond * 5)
+					waitCount++
+				}
+			}
+		}()
+	}
+}
+
+func recvers(ctx context.Context, url string, count int, ch chan<- bool) []deliver.Deliver {
+	for {
+		file := "/dev/shm/" + url
+		exist, _ := pathExists(file)
+		if exist {
+			break
+		}
+		fmt.Println("wait for shm server start")
+		time.Sleep(time.Second)
+	}
+	var cs []deliver.Deliver
+	for i := 0; i < count; i++ {
+		c, err := deliver.NewClientWithError(deliver.Shm, url)
+		for {
+			if err == nil {
+				break
+			}
+			time.Sleep(1 * time.Second)
+			c, err = deliver.NewClientWithError(deliver.Shm, url)
+			fmt.Println(i, " client create failed : ", err)
+		}
+		cs = append(cs, c)
+		go shmrecver(ctx, c, i, ch)
+	}
+	return cs
+}
+
+func pathExists(path string) (bool, error) {
+	_, err := os.Stat(path)
+	if err == nil {
+		return true, nil
+	}
+	if os.IsNotExist(err) {
+		return false, nil
+	}
+	return false, err
+}
diff --git a/profile/shmsend.go b/profile/shmsend.go
new file mode 100644
index 0000000..61a3b87
--- /dev/null
+++ b/profile/shmsend.go
@@ -0,0 +1,65 @@
+package profile
+
+import (
+	"context"
+	"demo/deliver"
+	"fmt"
+	"time"
+)
+
+func shmsender(ctx context.Context, s deliver.Deliver, index int) {
+	var err error
+
+	buf := make([]byte, dLen)
+
+	copy(buf, []byte("hello, give you this"))
+	for {
+
+		select {
+		case <-ctx.Done():
+			s.Close()
+			fmt.Println("quit shm sender")
+			return
+		default:
+			if err = s.Send(buf); err != nil {
+
+				fmt.Printf("can't send message on push socket: %s\n", err.Error())
+			} else {
+
+				fmt.Printf("send msg length %d\n", len(buf))
+			}
+		}
+
+		// time.Sleep(10 * time.Millisecond)
+	}
+}
+
+func Shmsend(ctx context.Context, server bool, ipc string, count int) {
+	blocks := 2
+	if server {
+		s, err := deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen)
+		for {
+			if err == nil {
+				break
+			}
+			time.Sleep(1 * time.Second)
+			s, err = deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen)
+		}
+		go shmsender(ctx, s, 0)
+	} else {
+
+		var cs []deliver.Deliver
+		for i := 0; i < count; i++ {
+			c, err := deliver.NewClientWithError(deliver.Shm, ipc)
+			for {
+				if err == nil {
+					break
+				}
+				time.Sleep(1 * time.Second)
+				c, err = deliver.NewClientWithError(deliver.Shm, ipc)
+			}
+			cs = append(cs, c)
+			go shmsender(ctx, c, i)
+		}
+	}
+}
diff --git a/reqrep.go b/reqrep.go
deleted file mode 100644
index b23fdf4..0000000
--- a/reqrep.go
+++ /dev/null
@@ -1,69 +0,0 @@
-package main
-
-import (
-	"demo/deliver"
-	"fmt"
-	"time"
-)
-
-func req(url string, m deliver.Mode) {
-	p := deliver.NewClient(m, url)
-	var err error
-
-	msg := `hello, give me your data`
-
-	var t int64
-	var elapse int64
-	count := 0
-
-	for {
-
-		if err = p.Send([]byte(msg)); err != nil {
-
-			fmt.Printf("can't send message on push socket: %s\n", err.Error())
-		} else {
-		}
-
-		if buf, err := p.Recv(); err != nil {
-			fmt.Println("recv error: ", err)
-		} else {
-			if t == 0 {
-				t = time.Now().UnixNano()
-			}
-			elapse = time.Now().UnixNano() - t
-
-			count++
-
-			if elapse > 1e9 {
-				fmt.Printf("NODE: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
-					count, len(buf), elapse)
-				elapse = 0
-				count = 0
-				t = 0
-			}
-
-		}
-		// time.Sleep(10 * time.Millisecond)
-	}
-
-}
-
-func rep(url string, m deliver.Mode) {
-	c := deliver.NewServer(m, url)
-
-	var msg []byte
-	var err error
-
-	buf := make([]byte, dLen)
-
-	for {
-		msg, err = c.Recv()
-		if err != nil {
-			fmt.Println("recv error : ", err, " msg ", msg)
-			continue
-		}
-
-		c.Send(buf)
-		// time.Sleep(10 * time.Millisecond)
-	}
-}
diff --git a/runShm.go b/runShm.go
deleted file mode 100644
index 1f05617..0000000
--- a/runShm.go
+++ /dev/null
@@ -1,129 +0,0 @@
-package main
-
-import (
-	"demo/deliver"
-	"fmt"
-	"os"
-	"os/signal"
-	"sync"
-	"time"
-
-	"golang.org/x/sys/unix"
-)
-
-func shmSenderImpl(s deliver.Deliver) {
-	var err error
-
-	buf := make([]byte, dLen)
-
-	copy(buf, []byte("hello, give you this"))
-	for {
-
-		select {
-		case <-ctx.Done():
-			return
-		default:
-			if err = s.Send(buf); err != nil {
-
-				fmt.Printf("can't send message on push socket: %s\n", err.Error())
-			} else {
-
-				fmt.Printf("send msg length %d\n", len(buf))
-			}
-		}
-
-		// time.Sleep(10 * time.Millisecond)
-	}
-
-}
-
-func shmSender(url string, args ...interface{}) {
-	s, err := deliver.NewServerWithError(deliver.Shm, url, args...)
-	for {
-		if err == nil {
-			break
-		}
-		fmt.Println("create shm error : ", err)
-		time.Sleep(1 * time.Second)
-		s, err = deliver.NewServerWithError(deliver.Shm, url, args)
-	}
-	go shmSenderImpl(s)
-
-	c := make(chan os.Signal, 1)
-	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGINT)
-	<-c
-
-	cancel()
-	s.Close()
-}
-
-func shmRecvImpl(wg *sync.WaitGroup, c deliver.Deliver, url string, index int) {
-
-	var msg []byte
-	var err error
-
-	var t int64
-	var elapse int64
-	count := 0
-
-	for {
-		select {
-		case <-ctx.Done():
-			wg.Done()
-			return
-		default:
-			msg, err = c.Recv()
-			if err != nil {
-				fmt.Println("recv error : ", err)
-			}
-			if t == 0 {
-				t = time.Now().UnixNano()
-			}
-			elapse = time.Now().UnixNano() - t
-
-			count++
-
-			if elapse > 1e9 {
-				fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n",
-					index, count, string(msg), len(msg), elapse)
-				elapse = 0
-				count = 0
-				t = 0
-			}
-		}
-
-		// time.Sleep(10 * time.Millisecond)
-	}
-
-}
-
-func shmReciever(url string, count int) {
-
-	wg := sync.WaitGroup{}
-
-	var cs []deliver.Deliver
-	for i := 0; i < count; i++ {
-		wg.Add(1)
-		c, err := deliver.NewClientWithError(deliver.Shm, url)
-		for {
-			if err == nil {
-				break
-			}
-			time.Sleep(1 * time.Second)
-			c, err = deliver.NewClientWithError(deliver.Shm, url)
-			fmt.Println(i, " client create failed : ", err)
-		}
-		cs = append(cs, c)
-		go shmRecvImpl(&wg, c, url, i)
-	}
-
-	c := make(chan os.Signal, 1)
-	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
-	<-c
-
-	cancel()
-	wg.Wait()
-	for _, v := range cs {
-		v.Close()
-	}
-}
diff --git a/runShm2.go b/runShm2.go
deleted file mode 100644
index 34ea9df..0000000
--- a/runShm2.go
+++ /dev/null
@@ -1,122 +0,0 @@
-package main
-
-import (
-	"demo/deliver"
-	"fmt"
-	"os"
-	"os/signal"
-	"time"
-
-	"golang.org/x/sys/unix"
-)
-
-func shmSenderImpl2(s deliver.Deliver) {
-	var err error
-
-	buf := make([]byte, dLen)
-
-	copy(buf, []byte("hello, give you this"))
-	for {
-
-		select {
-		case <-ctx.Done():
-			return
-		default:
-			if err = s.Send(buf); err != nil {
-
-				fmt.Printf("can't send message on push socket: %s\n", err.Error())
-			} else {
-
-				fmt.Printf("send msg length %d\n", len(buf))
-			}
-		}
-
-		// time.Sleep(10 * time.Millisecond)
-	}
-
-}
-
-func shmReciever2(url string, count int, args ...interface{}) {
-	s, err := deliver.NewServerWithError(deliver.Shm, url, args...)
-	for {
-		if err == nil {
-			break
-		}
-		time.Sleep(1 * time.Second)
-		s, err = deliver.NewServerWithError(deliver.Shm, url, args...)
-	}
-
-	go shmRecvImpl2(s, 0)
-
-	c := make(chan os.Signal, 1)
-	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGINT)
-	<-c
-
-	cancel()
-	s.Close()
-}
-
-func shmRecvImpl2(c deliver.Deliver, index int) {
-
-	var msg []byte
-	var err error
-
-	var t int64
-	var elapse int64
-	count := 0
-
-	for {
-		select {
-		case <-ctx.Done():
-			return
-		default:
-			msg, err = c.Recv()
-			if err != nil {
-				fmt.Println("recv error : ", err)
-			}
-			if t == 0 {
-				t = time.Now().UnixNano()
-			}
-			elapse = time.Now().UnixNano() - t
-
-			count++
-
-			if elapse > 1e9 {
-				fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n",
-					index, count, string(msg), len(msg), elapse)
-				elapse = 0
-				count = 0
-				t = 0
-			}
-		}
-
-		// time.Sleep(10 * time.Millisecond)
-	}
-
-}
-
-func shmSender2(url string, count int) {
-
-	var cs []deliver.Deliver
-	for i := 0; i < count; i++ {
-		c, err := deliver.NewClientWithError(deliver.Shm, url)
-		for {
-			if err == nil {
-				break
-			}
-			time.Sleep(1 * time.Second)
-			c, err = deliver.NewClientWithError(deliver.Shm, url)
-		}
-		cs = append(cs, c)
-		go shmSenderImpl2(c)
-	}
-
-	c := make(chan os.Signal, 1)
-	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
-	<-c
-
-	cancel()
-	for _, v := range cs {
-		v.Close()
-	}
-}
diff --git a/shm b/shm
new file mode 160000
index 0000000..a0123f1
--- /dev/null
+++ b/shm
@@ -1 +1 @@
-Subproject commit 0000000000000000000000000000000000000000
+Subproject commit a0123f163eddcea3e6b9f9d36f1f3fb3aa2c835a

--
Gitblit v1.8.0