fix
zhangqian
2023-12-01 8324f872ef3a4d0c978a9b1d062800c6a1701c12
pkg/nsqclient/consumer.go
@@ -1,6 +1,7 @@
package nsqclient
import (
   "apsClient/pkg/logx"
   "context"
   "fmt"
   "time"
@@ -18,7 +19,7 @@
   channel   string
}
func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
func NewNsqConsumer(topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
   conf := nsq.NewConfig()
   conf.MaxAttempts = 0
   conf.MsgTimeout = 10 * time.Minute         // 默认一个消息最多能处理十分钟,否则就会重新丢入队列
@@ -33,7 +34,6 @@
   }
   return &NsqConsumer{
      consumer: consumer,
      ctx:      ctx,
      topic:    topic,
      channel:  channel,
   }, nil
@@ -90,9 +90,19 @@
   for {
      select {
      case <-n.ctx.Done():
         fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel)
         logx.Infof("[%s] stop consumer...", n.topic)
         for _, addr := range qAddr {
            err = n.consumer.DisconnectFromNSQD(addr)
            if err != nil {
               logx.Errorf("disconnect from nsq server failed, err: %v, addr: %v, topic: %v", err, addr, n.topic)
            } else {
               logx.Infof("disconnect from nsq server success, addr: %v, topic: %v", addr, n.topic)
            }
         }
         n.consumer.Stop()
         fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel)
         <-n.consumer.StopChan
         logx.Infof("[%s] stop consumer success", n.topic)
         return nil
      }
   }