package main import ( "demo/deliver" "fmt" "os" "os/signal" "sync" "time" "golang.org/x/sys/unix" ) func shmSenderImpl(s deliver.Deliver) { var err error buf := make([]byte, dLen) copy(buf, []byte("hello, give you this")) for { select { case <-ctx.Done(): return default: if err = s.Send(buf); err != nil { fmt.Printf("can't send message on push socket: %s\n", err.Error()) } else { fmt.Printf("send msg length %d\n", len(buf)) } } // time.Sleep(10 * time.Millisecond) } } func shmSender(url string, args ...interface{}) { s, err := deliver.NewServerWithError(deliver.Shm, url, args...) for { if err == nil { break } fmt.Println("create shm error : ", err) time.Sleep(1 * time.Second) s, err = deliver.NewServerWithError(deliver.Shm, url, args) } go shmSenderImpl(s) c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, os.Kill, unix.SIGINT) <-c cancel() s.Close() } func shmRecvImpl(wg *sync.WaitGroup, c deliver.Deliver, url string, index int) { var msg []byte var err error var t int64 var elapse int64 count := 0 for { select { case <-ctx.Done(): wg.Done() return default: msg, err = c.Recv() if err != nil { fmt.Println("recv error : ", err) } if t == 0 { t = time.Now().UnixNano() } elapse = time.Now().UnixNano() - t count++ if elapse > 1e9 { fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n", index, count, string(msg), len(msg), elapse) elapse = 0 count = 0 t = 0 } } // time.Sleep(10 * time.Millisecond) } } func shmReciever(url string, count int) { wg := sync.WaitGroup{} var cs []deliver.Deliver for i := 0; i < count; i++ { wg.Add(1) c, err := deliver.NewClientWithError(deliver.Shm, url) for { if err == nil { break } time.Sleep(1 * time.Second) c, err = deliver.NewClientWithError(deliver.Shm, url) fmt.Println(i, " client create failed : ", err) } cs = append(cs, c) go shmRecvImpl(&wg, c, url, i) } c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) <-c cancel() wg.Wait() for _, v := range cs { v.Close() } }