From fbafdc0bb9ace05f477a747e6c6744309008c027 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 27 八月 2019 13:14:01 +0800 Subject: [PATCH] update deliver --- go.sum | 4 + profile/shmrecv.go | 111 ++++++++++++++++++++++++++---------- deliver | 2 go.mod | 2 main.go | 2 5 files changed, 88 insertions(+), 33 deletions(-) diff --git a/deliver b/deliver index 20a4c4b..020e17c 160000 --- a/deliver +++ b/deliver @@ -1 +1 @@ -Subproject commit 20a4c4bfb5b9ea427f9117408ff0e4513ebef9eb +Subproject commit 020e17cc5311d091d713eb4fabae2a3d50944916 diff --git a/go.mod b/go.mod index 471f6c4..b2403b6 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,8 @@ require ( basic.com/valib/shm.git v0.0.0-20190826090635-7db9aba5ca93 + github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 + github.com/funny/utest v0.0.0-20161029064919-43870a374500 // indirect github.com/gorilla/websocket v1.4.1 // indirect golang.org/x/sys v0.0.0-20190825160603-fb81701db80f nanomsg.org/go-mangos v1.4.0 diff --git a/go.sum b/go.sum index 65a01a4..cdf5450 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,9 @@ basic.com/valib/shm.git v0.0.0-20190826090635-7db9aba5ca93 h1:KawyUKwFGIQIv043VGV1QvXOiU8aq7DEyB3QhAq+Syc= basic.com/valib/shm.git v0.0.0-20190826090635-7db9aba5ca93/go.mod h1:yYRM7bM9y0KKd4IfNt3myjsvkFVFIIWNjsvK14tNbq4= +github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0= +github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478/go.mod h1:0j1+svBH8ABEIPdUP0AIg4qedsybnXGJBakCEw8cfoo= +github.com/funny/utest v0.0.0-20161029064919-43870a374500 h1:Z0r1CZnoIWFB/Uiwh1BU5FYmuFe6L5NPi6XWQEmsTRg= +github.com/funny/utest v0.0.0-20161029064919-43870a374500/go.mod h1:mUn39tBov9jKnTWV1RlOYoNzxdBFHiSzXWdY1FoNGGg= github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= golang.org/x/sys v0.0.0-20190825160603-fb81701db80f h1:LCxigP8q3fPRGNVYndYsyHnF0zRrvcoVwZMfb8iQZe4= diff --git a/main.go b/main.go index 24ed860..cc9c4dc 100644 --- a/main.go +++ b/main.go @@ -64,7 +64,7 @@ func main() { flag.Parse() - if server { + if proto == recv { go func() { http.ListenAndServe("0.0.0.0:6061", nil) }() diff --git a/profile/shmrecv.go b/profile/shmrecv.go index 08ca30c..b3f90f4 100644 --- a/profile/shmrecv.go +++ b/profile/shmrecv.go @@ -5,6 +5,7 @@ "demo/deliver" "fmt" "os" + "sync" "time" ) @@ -54,8 +55,8 @@ count++ if elapse > 1e9 { - fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n", - index, count, string(msg), len(msg), elapse) + fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", + index, count, len(msg), elapse) elapse = 0 count = 0 t = 0 @@ -66,6 +67,72 @@ } } + +func shmrecver2(ctx context.Context, c deliver.Deliver, index int, ch chan<- bool, pool *sync.Pool) { + + // var msg []byte + // var err error + + var t int64 + var elapse int64 + count := 0 + + for { + select { + case <-ctx.Done(): + return + default: + msg := *pool.Get().(*[]byte) + err := c.Recv2(msg) + if err != nil { + c.Close() + url := "hello" + i, err := deliver.NewClientWithError(deliver.Shm, url) + for { + if err == nil { + break + } + time.Sleep(1 * time.Second) + i, err = deliver.NewClientWithError(deliver.Shm, url) + + fmt.Println("client create failed : ", err) + + } + c = i + + pool.Put(&msg) + + fmt.Println("recv error : ", err) + continue + } + if ch != nil { + ch <- true + } + + if t == 0 { + t = time.Now().UnixNano() + } + elapse = time.Now().UnixNano() - t + + count++ + + if elapse > 1e9 { + fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", + index, count, len(msg), elapse) + elapse = 0 + count = 0 + t = 0 + } + + pool.Put(&msg) + } + + // time.Sleep(10 * time.Millisecond) + } + +} + +var chunkSize = 32 * 1024 * 1024 func Shmrecv(ctx context.Context, server bool, ipc string, count int) { blocks := 2 @@ -83,35 +150,6 @@ } else { recvers(ctx, ipc, count, nil) - - return - - chWaiter := make(chan bool, count) - cs := recvers(ctx, ipc, count, chWaiter) - - go func() { - waitCount := 0 - for { - select { - case <-ctx.Done(): - return - case <-chWaiter: - waitCount = 0 - default: - if waitCount > 200*3 { - for _, v := range cs { - v.Close() - } - // cs = recvers(ctx, ipc, count, chWaiter) - // fmt.Println("restart recievers") - // waitCount = 0 - // continue - } - time.Sleep(time.Millisecond * 5) - waitCount++ - } - } - }() } } @@ -125,6 +163,16 @@ fmt.Println("wait for shm server start") time.Sleep(time.Second) } + + // pool := &sync.Pool{ + // New: func() interface{} { + // b := make([]byte, chunkSize) + // return &b + // }, + // } + + // pool := slab.NewAtomPool(chunkSize, chunkSize, 2, chunkSize*8) + var cs []deliver.Deliver for i := 0; i < count; i++ { c, err := deliver.NewClientWithError(deliver.Shm, url) @@ -138,6 +186,7 @@ } cs = append(cs, c) go shmrecver(ctx, c, i, ch) + // go shmrecver2(ctx, c, i, ch, pool) } return cs } -- Gitblit v1.8.0