zhangmeng
2023-05-16 7ed5a9445c63b4fef30b338a7552b14f21a790c0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package test
 
import (
    "context"
    "fmt"
    "log"
    "nsqclient/nsqclient"
    "time"
)
 
func produce(two bool) {
    p, _ := nsqclient.NewProducer("192.168.20.108:4150")
 
    var str string
    for len(str) < 32 {
        str += "cnsqclient dynamic library"
    }
    msgx := []byte(str + "--x")
    msgy := []byte(str + "--y")
 
    // count := 0
    for i := 0; i < 1000000; i++ {
        // if e := p.Publish("test", []byte("x")); e != nil {
        if e := p.Publish("test", msgx); e != nil {
            log.Fatal("Publish error:" + e.Error())
        }
 
        if two {
            // if e := p.Publish("test", []byte("y")); e != nil {
            if e := p.Publish("test2", msgy); e != nil {
                log.Fatal("Publish error:" + e.Error())
            }
        }
 
        // log.Println("send time ", count)
        // count++
    }
}
 
func consume(topic, channel string) {
    ctx, cancel := context.WithCancel(context.Background())
 
    if c, e := nsqclient.NewNsqConsumer(ctx, topic, channel); e != nil {
        fmt.Println("NewNsqConsumer failed", e)
        return
    } else {
        ch := make(chan struct{})
 
        count := 0
        c.AddHandler(func(data []byte) error {
            count++
            fmt.Println("recv msg ", string(data), " size", count)
            if count > 999000 {
                ch <- struct{}{}
            }
            return nil
        })
        // go c.Run("192.168.20.108:4150", 2)
        go c.RunLookupd("192.168.20.108:4161", 2)
 
        t := time.Now()
        <-ch
        // fmt.Println("======>> use time ", time.Since(t))
        fmt.Println("======>> use time ", time.Now().Unix()-t.Unix())
        cancel()
    }
}
 
func Test() {
    go produce(true)
 
    go consume("test2", "sensor01")
    consume("test", "sensor01")
}