| | |
| | | "demo/deliver" |
| | | "fmt" |
| | | "os" |
| | | "sync" |
| | | "time" |
| | | ) |
| | | |
| | |
| | | count++ |
| | | |
| | | if elapse > 1e9 { |
| | | fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n", |
| | | index, count, string(msg), len(msg), elapse) |
| | | fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", |
| | | index, count, len(msg), elapse) |
| | | elapse = 0 |
| | | count = 0 |
| | | t = 0 |
| | |
| | | } |
| | | |
| | | } |
| | | |
| | | func shmrecver2(ctx context.Context, c deliver.Deliver, index int, ch chan<- bool, pool *sync.Pool) { |
| | | |
| | | // var msg []byte |
| | | // var err error |
| | | |
| | | var t int64 |
| | | var elapse int64 |
| | | count := 0 |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | msg := *pool.Get().(*[]byte) |
| | | err := c.Recv2(msg) |
| | | if err != nil { |
| | | c.Close() |
| | | url := "hello" |
| | | i, err := deliver.NewClientWithError(deliver.Shm, url) |
| | | for { |
| | | if err == nil { |
| | | break |
| | | } |
| | | time.Sleep(1 * time.Second) |
| | | i, err = deliver.NewClientWithError(deliver.Shm, url) |
| | | |
| | | fmt.Println("client create failed : ", err) |
| | | |
| | | } |
| | | c = i |
| | | |
| | | pool.Put(&msg) |
| | | |
| | | fmt.Println("recv error : ", err) |
| | | continue |
| | | } |
| | | if ch != nil { |
| | | ch <- true |
| | | } |
| | | |
| | | if t == 0 { |
| | | t = time.Now().UnixNano() |
| | | } |
| | | elapse = time.Now().UnixNano() - t |
| | | |
| | | count++ |
| | | |
| | | if elapse > 1e9 { |
| | | fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", |
| | | index, count, len(msg), elapse) |
| | | elapse = 0 |
| | | count = 0 |
| | | t = 0 |
| | | } |
| | | |
| | | pool.Put(&msg) |
| | | } |
| | | |
| | | // time.Sleep(10 * time.Millisecond) |
| | | } |
| | | |
| | | } |
| | | |
| | | var chunkSize = 32 * 1024 * 1024 |
| | | |
| | | func Shmrecv(ctx context.Context, server bool, ipc string, count int) { |
| | | blocks := 2 |
| | |
| | | |
| | | } else { |
| | | recvers(ctx, ipc, count, nil) |
| | | |
| | | return |
| | | |
| | | chWaiter := make(chan bool, count) |
| | | cs := recvers(ctx, ipc, count, chWaiter) |
| | | |
| | | go func() { |
| | | waitCount := 0 |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | case <-chWaiter: |
| | | waitCount = 0 |
| | | default: |
| | | if waitCount > 200*3 { |
| | | for _, v := range cs { |
| | | v.Close() |
| | | } |
| | | // cs = recvers(ctx, ipc, count, chWaiter) |
| | | // fmt.Println("restart recievers") |
| | | // waitCount = 0 |
| | | // continue |
| | | } |
| | | time.Sleep(time.Millisecond * 5) |
| | | waitCount++ |
| | | } |
| | | } |
| | | }() |
| | | } |
| | | } |
| | | |
| | |
| | | fmt.Println("wait for shm server start") |
| | | time.Sleep(time.Second) |
| | | } |
| | | |
| | | // pool := &sync.Pool{ |
| | | // New: func() interface{} { |
| | | // b := make([]byte, chunkSize) |
| | | // return &b |
| | | // }, |
| | | // } |
| | | |
| | | // pool := slab.NewAtomPool(chunkSize, chunkSize, 2, chunkSize*8) |
| | | |
| | | var cs []deliver.Deliver |
| | | for i := 0; i < count; i++ { |
| | | c, err := deliver.NewClientWithError(deliver.Shm, url) |
| | |
| | | } |
| | | cs = append(cs, c) |
| | | go shmrecver(ctx, c, i, ch) |
| | | // go shmrecver2(ctx, c, i, ch, pool) |
| | | } |
| | | return cs |
| | | } |