zhangqian
2023-10-19 eba4eb850f0ecfb5839395aa125955ceaa2a454f
pkg/nsqclient/consumer.go
@@ -1,6 +1,7 @@
package nsqclient
import (
   "apsClient/pkg/logx"
   "context"
   "fmt"
   "time"
@@ -18,7 +19,7 @@
   channel   string
}
func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
func NewNsqConsumer(topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
   conf := nsq.NewConfig()
   conf.MaxAttempts = 0
   conf.MsgTimeout = 10 * time.Minute         // 默认一个消息最多能处理十分钟,否则就会重新丢入队列
@@ -33,7 +34,6 @@
   }
   return &NsqConsumer{
      consumer: consumer,
      ctx:      ctx,
      topic:    topic,
      channel:  channel,
   }, nil
@@ -90,9 +90,9 @@
   for {
      select {
      case <-n.ctx.Done():
         fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel)
         logx.Infof("[%s]%s stop consumer...", n.topic, n.channel)
         n.consumer.Stop()
         fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel)
         logx.Infof("[%s]%s stop consumer success", n.topic, n.channel)
         return nil
      }
   }