package test import ( "basic.com/aps/nsqclient.git" "context" "fmt" "log" "time" ) func produce(two bool) { p, _ := nsqclient.NewProducer("192.168.20.108:4150") var str string for len(str) < 32 { str += "cnsqclient dynamic library" } msgx := []byte(str + "--x") msgy := []byte(str + "--y") // count := 0 for i := 0; i < 1000000; i++ { // if e := p.Publish("test", []byte("x")); e != nil { if e := p.Publish("test", msgx); e != nil { log.Fatal("Publish error:" + e.Error()) } if two { // if e := p.Publish("test", []byte("y")); e != nil { if e := p.Publish("test2", msgy); e != nil { log.Fatal("Publish error:" + e.Error()) } } // log.Println("send time ", count) // count++ } } func consume(topic, channel string) { ctx, cancel := context.WithCancel(context.Background()) if c, e := nsqclient.NewNsqConsumer(ctx, topic, channel); e != nil { fmt.Println("NewNsqConsumer failed", e) return } else { ch := make(chan struct{}) count := 0 c.AddHandler(func(data []byte) error { count++ fmt.Println("recv msg ", string(data), " size", count) if count > 999000 { ch <- struct{}{} } return nil }) // go c.Run("192.168.20.108:4150", 2) go c.RunLookupd("192.168.20.108:4161", 2) t := time.Now() <-ch // fmt.Println("======>> use time ", time.Since(t)) fmt.Println("======>> use time ", time.Now().Unix()-t.Unix()) cancel() } } func Test() { two := false go produce(two) if two { go consume("test2", "sensor01") } consume("test", "sensor01") }