zhangmeng
2023-05-16 1b4fc8c66026c77abf734126b4e7662e386ec0f5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package nsqcli
 
import (
    "context"
    "fmt"
    "time"
 
    nsq "github.com/nsqio/go-nsq"
)
 
type NsqConsumer struct {
    consumer *nsq.Consumer
    // handler   nsq.Handler
    handler   func([]byte) error
    ctx       context.Context
    ctxCancel context.CancelFunc
    topic     string
    channel   string
}
 
func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
    conf := nsq.NewConfig()
    conf.MaxAttempts = 0
    conf.MsgTimeout = 10 * time.Minute         // 默认一个消息最多能处理十分钟,否则就会重新丢入队列
    conf.LookupdPollInterval = 3 * time.Second // 调整consumer的重连间隔时间为3秒
    for _, option := range options {
        option(conf)
    }
 
    consumer, err := nsq.NewConsumer(topic, channel, conf)
    if err != nil {
        return nil, err
    }
    return &NsqConsumer{
        consumer: consumer,
        ctx:      ctx,
        topic:    topic,
        channel:  channel,
    }, nil
}
 
func DestroyNsqConsumer(c *NsqConsumer) {
    if c != nil {
        if c.ctxCancel != nil {
            c.ctxCancel()
        }
    }
}
 
// func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
//     n.handler = handler
// }
 
func (n *NsqConsumer) AddHandler(handler func([]byte) error) {
    n.handler = handler
}
 
func (n *NsqConsumer) Run(qaddr string, concurrency int) error {
    return n.RunDistributed([]string{qaddr}, nil, concurrency)
}
 
func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error {
    return n.RunDistributed(nil, []string{lookupAddr}, concurrency)
}
 
func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error {
    n.consumer.ChangeMaxInFlight(concurrency)
    // n.consumer.AddConcurrentHandlers(n.handler, concurrency)
    n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error {
        return n.handler(msg.Body)
        // return nil
    }), concurrency)
 
    var err error
    if len(qAddr) > 0 {
        err = n.consumer.ConnectToNSQDs(qAddr)
    } else if len(lAddr) > 0 {
        err = n.consumer.ConnectToNSQLookupds(lAddr)
    } else {
        err = fmt.Errorf("Addr Must NOT Empty")
    }
    if err != nil {
        return err
    }
 
    if n.ctx == nil {
        n.ctx, n.ctxCancel = context.WithCancel(context.Background())
    }
 
    for {
        select {
        case <-n.ctx.Done():
            fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel)
            n.consumer.Stop()
            fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel)
            return nil
        }
    }
}