package profile
|
|
import (
|
"context"
|
"demo/deliver"
|
"fmt"
|
"os"
|
"sync"
|
"time"
|
)
|
|
func shmrecver(ctx context.Context, c deliver.Deliver, index int, ch chan<- bool) {
|
|
var msg []byte
|
var err error
|
|
var t int64
|
var elapse int64
|
count := 0
|
|
for {
|
select {
|
case <-ctx.Done():
|
return
|
default:
|
msg, err = c.Recv()
|
if err != nil {
|
c.Close()
|
url := "hello"
|
i, err := deliver.NewClientWithError(deliver.Shm, url)
|
for {
|
if err == nil {
|
break
|
}
|
time.Sleep(1 * time.Second)
|
i, err = deliver.NewClientWithError(deliver.Shm, url)
|
|
fmt.Println("client create failed : ", err)
|
|
}
|
c = i
|
|
fmt.Println("recv error : ", err)
|
continue
|
}
|
if ch != nil {
|
ch <- true
|
}
|
|
if t == 0 {
|
t = time.Now().UnixNano()
|
}
|
elapse = time.Now().UnixNano() - t
|
|
count++
|
|
if elapse > 1e9 {
|
fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
|
index, count, len(msg), elapse)
|
elapse = 0
|
count = 0
|
t = 0
|
}
|
}
|
|
// time.Sleep(10 * time.Millisecond)
|
}
|
|
}
|
|
func shmrecver2(ctx context.Context, c deliver.Deliver, index int, ch chan<- bool, pool *sync.Pool) {
|
|
// var msg []byte
|
// var err error
|
|
var t int64
|
var elapse int64
|
count := 0
|
|
for {
|
select {
|
case <-ctx.Done():
|
return
|
default:
|
msg := *pool.Get().(*[]byte)
|
_, err := c.Recv2(msg)
|
if err != nil {
|
c.Close()
|
url := "hello"
|
i, err := deliver.NewClientWithError(deliver.Shm, url)
|
for {
|
if err == nil {
|
break
|
}
|
time.Sleep(1 * time.Second)
|
i, err = deliver.NewClientWithError(deliver.Shm, url)
|
|
fmt.Println("client create failed : ", err)
|
|
}
|
c = i
|
|
pool.Put(&msg)
|
|
fmt.Println("recv error : ", err)
|
continue
|
}
|
if ch != nil {
|
ch <- true
|
}
|
|
if t == 0 {
|
t = time.Now().UnixNano()
|
}
|
elapse = time.Now().UnixNano() - t
|
|
count++
|
|
if elapse > 1e9 {
|
fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
|
index, count, len(msg), elapse)
|
elapse = 0
|
count = 0
|
t = 0
|
}
|
|
pool.Put(&msg)
|
}
|
|
// time.Sleep(10 * time.Millisecond)
|
}
|
|
}
|
|
var chunkSize = 32 * 1024 * 1024
|
|
func Shmrecv(ctx context.Context, server bool, ipc string, count int) {
|
blocks := 2
|
|
if server {
|
s, err := deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen)
|
for {
|
if err == nil {
|
break
|
}
|
time.Sleep(1 * time.Second)
|
s, err = deliver.NewServerWithError(deliver.Shm, ipc, blocks, dLen)
|
}
|
go shmrecver(ctx, s, 0, nil)
|
|
} else {
|
recvers(ctx, ipc, count, nil)
|
}
|
}
|
|
func recvers(ctx context.Context, url string, count int, ch chan<- bool) []deliver.Deliver {
|
for {
|
file := "/dev/shm/" + url
|
exist, _ := pathExists(file)
|
if exist {
|
break
|
}
|
fmt.Println("wait for shm server start")
|
time.Sleep(time.Second)
|
}
|
|
// pool := &sync.Pool{
|
// New: func() interface{} {
|
// b := make([]byte, chunkSize)
|
// return &b
|
// },
|
// }
|
|
// pool := slab.NewAtomPool(chunkSize, chunkSize, 2, chunkSize*8)
|
|
var cs []deliver.Deliver
|
for i := 0; i < count; i++ {
|
c, err := deliver.NewClientWithError(deliver.Shm, url)
|
for {
|
if err == nil {
|
break
|
}
|
time.Sleep(1 * time.Second)
|
c, err = deliver.NewClientWithError(deliver.Shm, url)
|
fmt.Println(i, " client create failed : ", err)
|
}
|
cs = append(cs, c)
|
go shmrecver(ctx, c, i, ch)
|
// go shmrecver2(ctx, c, i, ch, pool)
|
}
|
return cs
|
}
|
|
func pathExists(path string) (bool, error) {
|
_, err := os.Stat(path)
|
if err == nil {
|
return true, nil
|
}
|
if os.IsNotExist(err) {
|
return false, nil
|
}
|
return false, err
|
}
|