package profile import ( "context" "demo/deliver" "fmt" "os" "sync" "time" ) func shmrecver(ctx context.Context, c deliver.Deliver, index int, ch chan<- bool) { var msg []byte var err error var t int64 var elapse int64 count := 0 for { select { case <-ctx.Done(): return default: msg, err = c.Recv() if err != nil { c.Close() url := "hello" i, err := deliver.NewClientWithError(deliver.Shm, url) for { if err == nil { break } time.Sleep(1 * time.Second) i, err = deliver.NewClientWithError(deliver.Shm, url) fmt.Println("client create failed : ", err) } c = i fmt.Println("recv error : ", err) continue } if ch != nil { ch <- true } if t == 0 { t = time.Now().UnixNano() } elapse = time.Now().UnixNano() - t count++ if elapse > 1e9 { fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", index, count, len(msg), elapse) elapse = 0 count = 0 t = 0 } } // time.Sleep(10 * time.Millisecond) } } func shmrecver2(ctx context.Context, c deliver.Deliver, index int, ch chan<- bool, pool *sync.Pool) { // var msg []byte // var err error var t int64 var elapse int64 count := 0 for { select { case <-ctx.Done(): return default: msg := *pool.Get().(*[]byte) _, err := c.Recv2(msg) if err != nil { c.Close() url := "hello" i, err := deliver.NewClientWithError(deliver.Shm, url) for { if err == nil { break } time.Sleep(1 * time.Second) i, err = deliver.NewClientWithError(deliver.Shm, url) fmt.Println("client create failed : ", err) } c = i pool.Put(&msg) fmt.Println("recv error : ", err) continue } if ch != nil { ch <- true } if t == 0 { t = time.Now().UnixNano() } elapse = time.Now().UnixNano() - t count++ if elapse > 1e9 { fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", index, count, len(msg), elapse) elapse = 0 count = 0 t = 0 } pool.Put(&msg) } // time.Sleep(10 * time.Millisecond) } } var chunkSize = 32 * 1024 * 1024 func Shmrecv(ctx context.Context, server bool, ipc string, count int) { blocks := 2 if server { s, err := deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen) for { if err == nil { break } time.Sleep(1 * time.Second) s, err = deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen) } go shmrecver(ctx, s, 0, nil) } else { recvers(ctx, ipc, count, nil) } } func recvers(ctx context.Context, url string, count int, ch chan<- bool) []deliver.Deliver { for { file := "/dev/shm/" + url exist, _ := pathExists(file) if exist { break } fmt.Println("wait for shm server start") time.Sleep(time.Second) } // pool := &sync.Pool{ // New: func() interface{} { // b := make([]byte, chunkSize) // return &b // }, // } // pool := slab.NewAtomPool(chunkSize, chunkSize, 2, chunkSize*8) var cs []deliver.Deliver for i := 0; i < count; i++ { c, err := deliver.NewClientWithError(deliver.Shm, url) for { if err == nil { break } time.Sleep(1 * time.Second) c, err = deliver.NewClientWithError(deliver.Shm, url) fmt.Println(i, " client create failed : ", err) } cs = append(cs, c) go shmrecver(ctx, c, i, ch) // go shmrecver2(ctx, c, i, ch, pool) } return cs } func pathExists(path string) (bool, error) { _, err := os.Stat(path) if err == nil { return true, nil } if os.IsNotExist(err) { return false, nil } return false, err }