From fbafdc0bb9ace05f477a747e6c6744309008c027 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 27 八月 2019 13:14:01 +0800
Subject: [PATCH] update deliver

---
 go.sum             |    4 +
 profile/shmrecv.go |  111 ++++++++++++++++++++++++++----------
 deliver            |    2 
 go.mod             |    2 
 main.go            |    2 
 5 files changed, 88 insertions(+), 33 deletions(-)

diff --git a/deliver b/deliver
index 20a4c4b..020e17c 160000
--- a/deliver
+++ b/deliver
@@ -1 +1 @@
-Subproject commit 20a4c4bfb5b9ea427f9117408ff0e4513ebef9eb
+Subproject commit 020e17cc5311d091d713eb4fabae2a3d50944916
diff --git a/go.mod b/go.mod
index 471f6c4..b2403b6 100644
--- a/go.mod
+++ b/go.mod
@@ -4,6 +4,8 @@
 
 require (
 	basic.com/valib/shm.git v0.0.0-20190826090635-7db9aba5ca93
+	github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478
+	github.com/funny/utest v0.0.0-20161029064919-43870a374500 // indirect
 	github.com/gorilla/websocket v1.4.1 // indirect
 	golang.org/x/sys v0.0.0-20190825160603-fb81701db80f
 	nanomsg.org/go-mangos v1.4.0
diff --git a/go.sum b/go.sum
index 65a01a4..cdf5450 100644
--- a/go.sum
+++ b/go.sum
@@ -1,5 +1,9 @@
 basic.com/valib/shm.git v0.0.0-20190826090635-7db9aba5ca93 h1:KawyUKwFGIQIv043VGV1QvXOiU8aq7DEyB3QhAq+Syc=
 basic.com/valib/shm.git v0.0.0-20190826090635-7db9aba5ca93/go.mod h1:yYRM7bM9y0KKd4IfNt3myjsvkFVFIIWNjsvK14tNbq4=
+github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0=
+github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478/go.mod h1:0j1+svBH8ABEIPdUP0AIg4qedsybnXGJBakCEw8cfoo=
+github.com/funny/utest v0.0.0-20161029064919-43870a374500 h1:Z0r1CZnoIWFB/Uiwh1BU5FYmuFe6L5NPi6XWQEmsTRg=
+github.com/funny/utest v0.0.0-20161029064919-43870a374500/go.mod h1:mUn39tBov9jKnTWV1RlOYoNzxdBFHiSzXWdY1FoNGGg=
 github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
 github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
 golang.org/x/sys v0.0.0-20190825160603-fb81701db80f h1:LCxigP8q3fPRGNVYndYsyHnF0zRrvcoVwZMfb8iQZe4=
diff --git a/main.go b/main.go
index 24ed860..cc9c4dc 100644
--- a/main.go
+++ b/main.go
@@ -64,7 +64,7 @@
 func main() {
 	flag.Parse()
 
-	if server {
+	if proto == recv {
 		go func() {
 			http.ListenAndServe("0.0.0.0:6061", nil)
 		}()
diff --git a/profile/shmrecv.go b/profile/shmrecv.go
index 08ca30c..b3f90f4 100644
--- a/profile/shmrecv.go
+++ b/profile/shmrecv.go
@@ -5,6 +5,7 @@
 	"demo/deliver"
 	"fmt"
 	"os"
+	"sync"
 	"time"
 )
 
@@ -54,8 +55,8 @@
 			count++
 
 			if elapse > 1e9 {
-				fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n",
-					index, count, string(msg), len(msg), elapse)
+				fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
+					index, count, len(msg), elapse)
 				elapse = 0
 				count = 0
 				t = 0
@@ -66,6 +67,72 @@
 	}
 
 }
+
+func shmrecver2(ctx context.Context, c deliver.Deliver, index int, ch chan<- bool, pool *sync.Pool) {
+
+	// var msg []byte
+	// var err error
+
+	var t int64
+	var elapse int64
+	count := 0
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			msg := *pool.Get().(*[]byte)
+			err := c.Recv2(msg)
+			if err != nil {
+				c.Close()
+				url := "hello"
+				i, err := deliver.NewClientWithError(deliver.Shm, url)
+				for {
+					if err == nil {
+						break
+					}
+					time.Sleep(1 * time.Second)
+					i, err = deliver.NewClientWithError(deliver.Shm, url)
+
+					fmt.Println("client create failed : ", err)
+
+				}
+				c = i
+
+				pool.Put(&msg)
+
+				fmt.Println("recv error : ", err)
+				continue
+			}
+			if ch != nil {
+				ch <- true
+			}
+
+			if t == 0 {
+				t = time.Now().UnixNano()
+			}
+			elapse = time.Now().UnixNano() - t
+
+			count++
+
+			if elapse > 1e9 {
+				fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
+					index, count, len(msg), elapse)
+				elapse = 0
+				count = 0
+				t = 0
+			}
+
+			pool.Put(&msg)
+		}
+
+		// time.Sleep(10 * time.Millisecond)
+	}
+
+}
+
+var chunkSize = 32 * 1024 * 1024
 
 func Shmrecv(ctx context.Context, server bool, ipc string, count int) {
 	blocks := 2
@@ -83,35 +150,6 @@
 
 	} else {
 		recvers(ctx, ipc, count, nil)
-
-		return
-
-		chWaiter := make(chan bool, count)
-		cs := recvers(ctx, ipc, count, chWaiter)
-
-		go func() {
-			waitCount := 0
-			for {
-				select {
-				case <-ctx.Done():
-					return
-				case <-chWaiter:
-					waitCount = 0
-				default:
-					if waitCount > 200*3 {
-						for _, v := range cs {
-							v.Close()
-						}
-						// cs = recvers(ctx, ipc, count, chWaiter)
-						// fmt.Println("restart recievers")
-						// waitCount = 0
-						// continue
-					}
-					time.Sleep(time.Millisecond * 5)
-					waitCount++
-				}
-			}
-		}()
 	}
 }
 
@@ -125,6 +163,16 @@
 		fmt.Println("wait for shm server start")
 		time.Sleep(time.Second)
 	}
+
+	// pool := &sync.Pool{
+	// 	New: func() interface{} {
+	// 		b := make([]byte, chunkSize)
+	// 		return &b
+	// 	},
+	// }
+
+	// pool := slab.NewAtomPool(chunkSize, chunkSize, 2, chunkSize*8)
+
 	var cs []deliver.Deliver
 	for i := 0; i < count; i++ {
 		c, err := deliver.NewClientWithError(deliver.Shm, url)
@@ -138,6 +186,7 @@
 		}
 		cs = append(cs, c)
 		go shmrecver(ctx, c, i, ch)
+		// go shmrecver2(ctx, c, i, ch, pool)
 	}
 	return cs
 }

--
Gitblit v1.8.0