package nsqcli
|
|
import (
|
"context"
|
"fmt"
|
"time"
|
|
nsq "github.com/nsqio/go-nsq"
|
)
|
|
type NsqConsumer struct {
|
consumer *nsq.Consumer
|
// handler nsq.Handler
|
handler func([]byte) error
|
ctx context.Context
|
ctxCancel context.CancelFunc
|
topic string
|
channel string
|
}
|
|
func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
|
conf := nsq.NewConfig()
|
conf.MaxAttempts = 0
|
conf.MsgTimeout = 10 * time.Minute // 默认一个消息最多能处理十分钟,否则就会重新丢入队列
|
conf.LookupdPollInterval = 3 * time.Second // 调整consumer的重连间隔时间为3秒
|
for _, option := range options {
|
option(conf)
|
}
|
|
consumer, err := nsq.NewConsumer(topic, channel, conf)
|
if err != nil {
|
return nil, err
|
}
|
return &NsqConsumer{
|
consumer: consumer,
|
ctx: ctx,
|
topic: topic,
|
channel: channel,
|
}, nil
|
}
|
|
func DestroyNsqConsumer(c *NsqConsumer) {
|
if c != nil {
|
if c.ctxCancel != nil {
|
c.ctxCancel()
|
}
|
}
|
}
|
|
// func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
|
// n.handler = handler
|
// }
|
|
func (n *NsqConsumer) AddHandler(handler func([]byte) error) {
|
n.handler = handler
|
}
|
|
func (n *NsqConsumer) Run(qaddr string, concurrency int) error {
|
return n.RunDistributed([]string{qaddr}, nil, concurrency)
|
}
|
|
func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error {
|
return n.RunDistributed(nil, []string{lookupAddr}, concurrency)
|
}
|
|
func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error {
|
n.consumer.ChangeMaxInFlight(concurrency)
|
// n.consumer.AddConcurrentHandlers(n.handler, concurrency)
|
n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error {
|
return n.handler(msg.Body)
|
// return nil
|
}), concurrency)
|
|
var err error
|
if len(qAddr) > 0 {
|
err = n.consumer.ConnectToNSQDs(qAddr)
|
} else if len(lAddr) > 0 {
|
err = n.consumer.ConnectToNSQLookupds(lAddr)
|
} else {
|
err = fmt.Errorf("Addr Must NOT Empty")
|
}
|
if err != nil {
|
return err
|
}
|
|
if n.ctx == nil {
|
n.ctx, n.ctxCancel = context.WithCancel(context.Background())
|
}
|
|
for {
|
select {
|
case <-n.ctx.Done():
|
fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel)
|
n.consumer.Stop()
|
fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel)
|
return nil
|
}
|
}
|
}
|