package test
|
|
import (
|
"basic.com/aps/nsqclient.git"
|
"context"
|
"fmt"
|
"log"
|
"time"
|
)
|
|
func produce(two bool) {
|
p, _ := nsqclient.NewProducer("192.168.20.108:4150")
|
|
var str string
|
for len(str) < 32 {
|
str += "cnsqclient dynamic library"
|
}
|
msgx := []byte(str + "--x")
|
msgy := []byte(str + "--y")
|
|
// count := 0
|
for i := 0; i < 1000000; i++ {
|
// if e := p.Publish("test", []byte("x")); e != nil {
|
if e := p.Publish("test", msgx); e != nil {
|
log.Fatal("Publish error:" + e.Error())
|
}
|
|
if two {
|
// if e := p.Publish("test", []byte("y")); e != nil {
|
if e := p.Publish("test2", msgy); e != nil {
|
log.Fatal("Publish error:" + e.Error())
|
}
|
}
|
|
// log.Println("send time ", count)
|
// count++
|
}
|
}
|
|
func consume(topic, channel string) {
|
ctx, cancel := context.WithCancel(context.Background())
|
|
if c, e := nsqclient.NewNsqConsumer(ctx, topic, channel); e != nil {
|
fmt.Println("NewNsqConsumer failed", e)
|
return
|
} else {
|
ch := make(chan struct{})
|
|
count := 0
|
c.AddHandler(func(data []byte) error {
|
count++
|
fmt.Println("recv msg ", string(data), " size", count)
|
if count > 999000 {
|
ch <- struct{}{}
|
}
|
return nil
|
})
|
// go c.Run("192.168.20.108:4150", 2)
|
go c.RunLookupd("192.168.20.108:4161", 2)
|
|
t := time.Now()
|
<-ch
|
// fmt.Println("======>> use time ", time.Since(t))
|
fmt.Println("======>> use time ", time.Now().Unix()-t.Unix())
|
cancel()
|
}
|
}
|
|
func Test() {
|
go produce(true)
|
|
go consume("test2", "sensor01")
|
consume("test", "sensor01")
|
}
|