From fbafdc0bb9ace05f477a747e6c6744309008c027 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 27 八月 2019 13:14:01 +0800
Subject: [PATCH] update deliver

---
 profile/shmrecv.go |  111 ++++++++++++++++++++++++++++++++++++++++---------------
 1 files changed, 80 insertions(+), 31 deletions(-)

diff --git a/profile/shmrecv.go b/profile/shmrecv.go
index 08ca30c..b3f90f4 100644
--- a/profile/shmrecv.go
+++ b/profile/shmrecv.go
@@ -5,6 +5,7 @@
 	"demo/deliver"
 	"fmt"
 	"os"
+	"sync"
 	"time"
 )
 
@@ -54,8 +55,8 @@
 			count++
 
 			if elapse > 1e9 {
-				fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n",
-					index, count, string(msg), len(msg), elapse)
+				fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
+					index, count, len(msg), elapse)
 				elapse = 0
 				count = 0
 				t = 0
@@ -66,6 +67,72 @@
 	}
 
 }
+
+func shmrecver2(ctx context.Context, c deliver.Deliver, index int, ch chan<- bool, pool *sync.Pool) {
+
+	// var msg []byte
+	// var err error
+
+	var t int64
+	var elapse int64
+	count := 0
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			msg := *pool.Get().(*[]byte)
+			err := c.Recv2(msg)
+			if err != nil {
+				c.Close()
+				url := "hello"
+				i, err := deliver.NewClientWithError(deliver.Shm, url)
+				for {
+					if err == nil {
+						break
+					}
+					time.Sleep(1 * time.Second)
+					i, err = deliver.NewClientWithError(deliver.Shm, url)
+
+					fmt.Println("client create failed : ", err)
+
+				}
+				c = i
+
+				pool.Put(&msg)
+
+				fmt.Println("recv error : ", err)
+				continue
+			}
+			if ch != nil {
+				ch <- true
+			}
+
+			if t == 0 {
+				t = time.Now().UnixNano()
+			}
+			elapse = time.Now().UnixNano() - t
+
+			count++
+
+			if elapse > 1e9 {
+				fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
+					index, count, len(msg), elapse)
+				elapse = 0
+				count = 0
+				t = 0
+			}
+
+			pool.Put(&msg)
+		}
+
+		// time.Sleep(10 * time.Millisecond)
+	}
+
+}
+
+var chunkSize = 32 * 1024 * 1024
 
 func Shmrecv(ctx context.Context, server bool, ipc string, count int) {
 	blocks := 2
@@ -83,35 +150,6 @@
 
 	} else {
 		recvers(ctx, ipc, count, nil)
-
-		return
-
-		chWaiter := make(chan bool, count)
-		cs := recvers(ctx, ipc, count, chWaiter)
-
-		go func() {
-			waitCount := 0
-			for {
-				select {
-				case <-ctx.Done():
-					return
-				case <-chWaiter:
-					waitCount = 0
-				default:
-					if waitCount > 200*3 {
-						for _, v := range cs {
-							v.Close()
-						}
-						// cs = recvers(ctx, ipc, count, chWaiter)
-						// fmt.Println("restart recievers")
-						// waitCount = 0
-						// continue
-					}
-					time.Sleep(time.Millisecond * 5)
-					waitCount++
-				}
-			}
-		}()
 	}
 }
 
@@ -125,6 +163,16 @@
 		fmt.Println("wait for shm server start")
 		time.Sleep(time.Second)
 	}
+
+	// pool := &sync.Pool{
+	// 	New: func() interface{} {
+	// 		b := make([]byte, chunkSize)
+	// 		return &b
+	// 	},
+	// }
+
+	// pool := slab.NewAtomPool(chunkSize, chunkSize, 2, chunkSize*8)
+
 	var cs []deliver.Deliver
 	for i := 0; i < count; i++ {
 		c, err := deliver.NewClientWithError(deliver.Shm, url)
@@ -138,6 +186,7 @@
 		}
 		cs = append(cs, c)
 		go shmrecver(ctx, c, i, ch)
+		// go shmrecver2(ctx, c, i, ch, pool)
 	}
 	return cs
 }

--
Gitblit v1.8.0