zhangmeng
2019-08-27 fbafdc0bb9ace05f477a747e6c6744309008c027
profile/shmrecv.go
@@ -5,6 +5,7 @@
   "demo/deliver"
   "fmt"
   "os"
   "sync"
   "time"
)
@@ -54,8 +55,8 @@
         count++
         if elapse > 1e9 {
            fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n",
               index, count, string(msg), len(msg), elapse)
            fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
               index, count, len(msg), elapse)
            elapse = 0
            count = 0
            t = 0
@@ -66,6 +67,72 @@
   }
}
func shmrecver2(ctx context.Context, c deliver.Deliver, index int, ch chan<- bool, pool *sync.Pool) {
   // var msg []byte
   // var err error
   var t int64
   var elapse int64
   count := 0
   for {
      select {
      case <-ctx.Done():
         return
      default:
         msg := *pool.Get().(*[]byte)
         err := c.Recv2(msg)
         if err != nil {
            c.Close()
            url := "hello"
            i, err := deliver.NewClientWithError(deliver.Shm, url)
            for {
               if err == nil {
                  break
               }
               time.Sleep(1 * time.Second)
               i, err = deliver.NewClientWithError(deliver.Shm, url)
               fmt.Println("client create failed : ", err)
            }
            c = i
            pool.Put(&msg)
            fmt.Println("recv error : ", err)
            continue
         }
         if ch != nil {
            ch <- true
         }
         if t == 0 {
            t = time.Now().UnixNano()
         }
         elapse = time.Now().UnixNano() - t
         count++
         if elapse > 1e9 {
            fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
               index, count, len(msg), elapse)
            elapse = 0
            count = 0
            t = 0
         }
         pool.Put(&msg)
      }
      // time.Sleep(10 * time.Millisecond)
   }
}
var chunkSize = 32 * 1024 * 1024
func Shmrecv(ctx context.Context, server bool, ipc string, count int) {
   blocks := 2
@@ -83,35 +150,6 @@
   } else {
      recvers(ctx, ipc, count, nil)
      return
      chWaiter := make(chan bool, count)
      cs := recvers(ctx, ipc, count, chWaiter)
      go func() {
         waitCount := 0
         for {
            select {
            case <-ctx.Done():
               return
            case <-chWaiter:
               waitCount = 0
            default:
               if waitCount > 200*3 {
                  for _, v := range cs {
                     v.Close()
                  }
                  // cs = recvers(ctx, ipc, count, chWaiter)
                  // fmt.Println("restart recievers")
                  // waitCount = 0
                  // continue
               }
               time.Sleep(time.Millisecond * 5)
               waitCount++
            }
         }
      }()
   }
}
@@ -125,6 +163,16 @@
      fmt.Println("wait for shm server start")
      time.Sleep(time.Second)
   }
   // pool := &sync.Pool{
   //    New: func() interface{} {
   //       b := make([]byte, chunkSize)
   //       return &b
   //    },
   // }
   // pool := slab.NewAtomPool(chunkSize, chunkSize, 2, chunkSize*8)
   var cs []deliver.Deliver
   for i := 0; i < count; i++ {
      c, err := deliver.NewClientWithError(deliver.Shm, url)
@@ -138,6 +186,7 @@
      }
      cs = append(cs, c)
      go shmrecver(ctx, c, i, ch)
      // go shmrecver2(ctx, c, i, ch, pool)
   }
   return cs
}