From fbafdc0bb9ace05f477a747e6c6744309008c027 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 27 八月 2019 13:14:01 +0800 Subject: [PATCH] update deliver --- profile/shmrecv.go | 111 ++++++++++++++++++++++++++++++++++++++++--------------- 1 files changed, 80 insertions(+), 31 deletions(-) diff --git a/profile/shmrecv.go b/profile/shmrecv.go index 08ca30c..b3f90f4 100644 --- a/profile/shmrecv.go +++ b/profile/shmrecv.go @@ -5,6 +5,7 @@ "demo/deliver" "fmt" "os" + "sync" "time" ) @@ -54,8 +55,8 @@ count++ if elapse > 1e9 { - fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n", - index, count, string(msg), len(msg), elapse) + fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", + index, count, len(msg), elapse) elapse = 0 count = 0 t = 0 @@ -66,6 +67,72 @@ } } + +func shmrecver2(ctx context.Context, c deliver.Deliver, index int, ch chan<- bool, pool *sync.Pool) { + + // var msg []byte + // var err error + + var t int64 + var elapse int64 + count := 0 + + for { + select { + case <-ctx.Done(): + return + default: + msg := *pool.Get().(*[]byte) + err := c.Recv2(msg) + if err != nil { + c.Close() + url := "hello" + i, err := deliver.NewClientWithError(deliver.Shm, url) + for { + if err == nil { + break + } + time.Sleep(1 * time.Second) + i, err = deliver.NewClientWithError(deliver.Shm, url) + + fmt.Println("client create failed : ", err) + + } + c = i + + pool.Put(&msg) + + fmt.Println("recv error : ", err) + continue + } + if ch != nil { + ch <- true + } + + if t == 0 { + t = time.Now().UnixNano() + } + elapse = time.Now().UnixNano() - t + + count++ + + if elapse > 1e9 { + fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", + index, count, len(msg), elapse) + elapse = 0 + count = 0 + t = 0 + } + + pool.Put(&msg) + } + + // time.Sleep(10 * time.Millisecond) + } + +} + +var chunkSize = 32 * 1024 * 1024 func Shmrecv(ctx context.Context, server bool, ipc string, count int) { blocks := 2 @@ -83,35 +150,6 @@ } else { recvers(ctx, ipc, count, nil) - - return - - chWaiter := make(chan bool, count) - cs := recvers(ctx, ipc, count, chWaiter) - - go func() { - waitCount := 0 - for { - select { - case <-ctx.Done(): - return - case <-chWaiter: - waitCount = 0 - default: - if waitCount > 200*3 { - for _, v := range cs { - v.Close() - } - // cs = recvers(ctx, ipc, count, chWaiter) - // fmt.Println("restart recievers") - // waitCount = 0 - // continue - } - time.Sleep(time.Millisecond * 5) - waitCount++ - } - } - }() } } @@ -125,6 +163,16 @@ fmt.Println("wait for shm server start") time.Sleep(time.Second) } + + // pool := &sync.Pool{ + // New: func() interface{} { + // b := make([]byte, chunkSize) + // return &b + // }, + // } + + // pool := slab.NewAtomPool(chunkSize, chunkSize, 2, chunkSize*8) + var cs []deliver.Deliver for i := 0; i < count; i++ { c, err := deliver.NewClientWithError(deliver.Shm, url) @@ -138,6 +186,7 @@ } cs = append(cs, c) go shmrecver(ctx, c, i, ch) + // go shmrecver2(ctx, c, i, ch, pool) } return cs } -- Gitblit v1.8.0