package main
|
|
import (
|
"demo/deliver"
|
"fmt"
|
"os"
|
"strconv"
|
"time"
|
)
|
|
const dLen = 12 * 1024 * 1024
|
|
var mode = deliver.PushPull
|
|
func sender(url string) {
|
|
var err error
|
|
s := deliver.NewProducer(deliver.Mode(mode), url)
|
|
buf := make([]byte, dLen)
|
|
for {
|
|
if err = s.Send(buf); err != nil {
|
|
fmt.Printf("can't send message on push socket: %s\n", err.Error())
|
} else {
|
|
fmt.Printf("send msg length %d\n", len(buf))
|
}
|
|
// time.Sleep(10 * time.Millisecond)
|
}
|
|
}
|
|
func recvImpl(url string, index int) {
|
c := deliver.NewConsumer(deliver.Mode(mode), url)
|
|
var msg []byte
|
var err error
|
|
var t int64
|
var elapse int64
|
count := 0
|
|
for {
|
msg, err = c.Recv()
|
if err != nil {
|
fmt.Println("recv error : ", err)
|
}
|
if t == 0 {
|
t = time.Now().UnixNano()
|
}
|
elapse = time.Now().UnixNano() - t
|
|
count++
|
|
if elapse > 1e9 {
|
fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
|
index, count, len(msg), elapse)
|
elapse = 0
|
count = 0
|
t = 0
|
}
|
|
// time.Sleep(10 * time.Millisecond)
|
}
|
}
|
|
func reciever(url string, strCount string) {
|
count, _ := strconv.Atoi(strCount)
|
|
for i := 0; i < count; i++ {
|
go recvImpl(url, i)
|
}
|
|
for {
|
time.Sleep(2 * time.Second)
|
}
|
|
}
|
|
func main() {
|
if len(os.Args) > 3 && os.Args[1] == "producer" {
|
if os.Args[2] == "pushpull" {
|
mode = deliver.PushPull
|
} else if os.Args[2] == "pubsub" {
|
mode = deliver.PubSub
|
} else if os.Args[2] == "pair" {
|
mode = deliver.Pair
|
}
|
sender(os.Args[3])
|
os.Exit(0)
|
}
|
if len(os.Args) > 3 && os.Args[1] == "consumer" {
|
if os.Args[2] == "pushpull" {
|
mode = deliver.PushPull
|
} else if os.Args[2] == "pubsub" {
|
mode = deliver.PubSub
|
} else if os.Args[2] == "pair" {
|
mode = deliver.Pair
|
}
|
|
reciever(os.Args[3], os.Args[4])
|
os.Exit(0)
|
}
|
fmt.Fprintf(os.Stderr,
|
"Usage: pushpull push|pull <URL> <ARG> ...\n")
|
os.Exit(1)
|
|
}
|