zhangqian
2023-10-20 7276ab65576ec73b439a40d7f1a3035a534b968c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package nsqclient
 
import (
    "apsClient/pkg/logx"
    "context"
    "fmt"
    "time"
 
    nsq "github.com/nsqio/go-nsq"
)
 
type NsqConsumer struct {
    consumer *nsq.Consumer
    // handler   nsq.Handler
    handler   func([]byte) error
    ctx       context.Context
    ctxCancel context.CancelFunc
    topic     string
    channel   string
}
 
func NewNsqConsumer(topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
    conf := nsq.NewConfig()
    conf.MaxAttempts = 0
    conf.MsgTimeout = 10 * time.Minute         // 默认一个消息最多能处理十分钟,否则就会重新丢入队列
    conf.LookupdPollInterval = 3 * time.Second // 调整consumer的重连间隔时间为3秒
    for _, option := range options {
        option(conf)
    }
 
    consumer, err := nsq.NewConsumer(topic, channel, conf)
    if err != nil {
        return nil, err
    }
    return &NsqConsumer{
        consumer: consumer,
        topic:    topic,
        channel:  channel,
    }, nil
}
 
func DestroyNsqConsumer(c *NsqConsumer) {
    if c != nil {
        if c.ctxCancel != nil {
            c.ctxCancel()
        }
    }
}
 
// func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
//     n.handler = handler
// }
 
func (n *NsqConsumer) AddHandler(handler func([]byte) error) {
    n.handler = handler
}
 
func (n *NsqConsumer) Run(qaddr string, concurrency int) error {
    return n.RunDistributed([]string{qaddr}, nil, concurrency)
}
 
func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error {
    return n.RunDistributed(nil, []string{lookupAddr}, concurrency)
}
 
func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error {
    n.consumer.ChangeMaxInFlight(concurrency)
    // n.consumer.AddConcurrentHandlers(n.handler, concurrency)
    n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error {
        return n.handler(msg.Body)
        // return nil
    }), concurrency)
 
    var err error
    if len(qAddr) > 0 {
        err = n.consumer.ConnectToNSQDs(qAddr)
    } else if len(lAddr) > 0 {
        err = n.consumer.ConnectToNSQLookupds(lAddr)
    } else {
        err = fmt.Errorf("Addr Must NOT Empty")
    }
    if err != nil {
        return err
    }
 
    if n.ctx == nil {
        n.ctx, n.ctxCancel = context.WithCancel(context.Background())
    }
 
    for {
        select {
        case <-n.ctx.Done():
            logx.Infof("[%s] stop consumer...", n.topic)
            n.consumer.Stop()
            <-n.consumer.StopChan
            logx.Infof("[%s] stop consumer success", n.topic)
            for _, addr := range qAddr {
                err = n.consumer.DisconnectFromNSQD(addr)
                if err != nil {
                    logx.Errorf("disconnect from nsq server failed, err: %v, addr: %v, topic: %v", err, addr, n.topic)
                } else {
                    logx.Infof("disconnect from nsq server success, addr: %v, topic: %v", addr, n.topic)
                }
            }
            return nil
        }
    }
}