package main import ( "demo/deliver" "fmt" "os" "os/signal" "strconv" "time" "golang.org/x/sys/unix" ) const dLen = 12 * 1024 * 1024 var mode = deliver.PushPull func senderImpl(s deliver.Deliver) { var err error buf := make([]byte, dLen) for { if err = s.Send(buf); err != nil { fmt.Printf("can't send message on push socket: %s\n", err.Error()) } else { fmt.Printf("send msg length %d\n", len(buf)) } // time.Sleep(10 * time.Millisecond) } } func sender(url string, args ...interface{}) { s := deliver.NewListener(deliver.Mode(mode), url, args...) go senderImpl(s) c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) <-c s.Close() } func recvImpl(url string, index int) { c := deliver.NewDialer(deliver.Mode(mode), url) var msg []byte var err error var t int64 var elapse int64 count := 0 for { msg, err = c.Recv() if err != nil { fmt.Println("recv error : ", err) } if t == 0 { t = time.Now().UnixNano() } elapse = time.Now().UnixNano() - t count++ if elapse > 1e9 { fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", index, count, len(msg), elapse) elapse = 0 count = 0 t = 0 } // time.Sleep(10 * time.Millisecond) } } func reciever(url string, strCount string) { count, _ := strconv.Atoi(strCount) for i := 0; i < count; i++ { go recvImpl(url, i) } for { time.Sleep(2 * time.Second) } } func modeType(t string) { if t == "pushpull" { mode = deliver.PushPull } else if t == "pubsub" { mode = deliver.PubSub } else if t == "pair" { mode = deliver.Pair } } func main() { if len(os.Args) > 3 && os.Args[1] == "producer" { modeType(os.Args[2]) sender(os.Args[3]) os.Exit(0) } if len(os.Args) > 3 && os.Args[1] == "consumer" { modeType(os.Args[2]) reciever(os.Args[3], os.Args[4]) os.Exit(0) } fmt.Fprintf(os.Stderr, "Usage: pushpull push|pull ...\n") os.Exit(1) }