| | |
| | | package nsqclient |
| | | |
| | | import ( |
| | | "apsClient/pkg/logx" |
| | | "context" |
| | | "fmt" |
| | | "time" |
| | |
| | | channel string |
| | | } |
| | | |
| | | func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) { |
| | | func NewNsqConsumer(topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) { |
| | | conf := nsq.NewConfig() |
| | | conf.MaxAttempts = 0 |
| | | conf.MsgTimeout = 10 * time.Minute // 默认一个消息最多能处理十分钟,否则就会重新丢入队列 |
| | |
| | | } |
| | | return &NsqConsumer{ |
| | | consumer: consumer, |
| | | ctx: ctx, |
| | | topic: topic, |
| | | channel: channel, |
| | | }, nil |
| | |
| | | for { |
| | | select { |
| | | case <-n.ctx.Done(): |
| | | fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel) |
| | | logx.Infof("[%s]%s stop consumer...", n.topic, n.channel) |
| | | n.consumer.Stop() |
| | | fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel) |
| | | logx.Infof("[%s]%s stop consumer success", n.topic, n.channel) |
| | | return nil |
| | | } |
| | | } |