package nsqclient import ( "context" "fmt" "time" nsq "github.com/nsqio/go-nsq" ) type NsqConsumer struct { consumer *nsq.Consumer // handler nsq.Handler handler func([]byte) error ctx context.Context ctxCancel context.CancelFunc topic string channel string } func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) { conf := nsq.NewConfig() conf.MaxAttempts = 0 conf.MsgTimeout = 10 * time.Minute // 默认一个消息最多能处理十分钟,否则就会重新丢入队列 conf.LookupdPollInterval = 3 * time.Second // 调整consumer的重连间隔时间为3秒 for _, option := range options { option(conf) } consumer, err := nsq.NewConsumer(topic, channel, conf) if err != nil { return nil, err } return &NsqConsumer{ consumer: consumer, ctx: ctx, topic: topic, channel: channel, }, nil } func DestroyNsqConsumer(c *NsqConsumer) { if c != nil { if c.ctxCancel != nil { c.ctxCancel() } } } // func (n *NsqConsumer) AddHandler(handler nsq.Handler) { // n.handler = handler // } func (n *NsqConsumer) AddHandler(handler func([]byte) error) { n.handler = handler } func (n *NsqConsumer) Run(qaddr string, concurrency int) error { return n.RunDistributed([]string{qaddr}, nil, concurrency) } func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error { return n.RunDistributed(nil, []string{lookupAddr}, concurrency) } func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error { n.consumer.ChangeMaxInFlight(concurrency) // n.consumer.AddConcurrentHandlers(n.handler, concurrency) n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error { return n.handler(msg.Body) // return nil }), concurrency) var err error if len(qAddr) > 0 { err = n.consumer.ConnectToNSQDs(qAddr) } else if len(lAddr) > 0 { err = n.consumer.ConnectToNSQLookupds(lAddr) } else { err = fmt.Errorf("Addr Must NOT Empty") } if err != nil { return err } if n.ctx == nil { n.ctx, n.ctxCancel = context.WithCancel(context.Background()) } for { select { case <-n.ctx.Done(): fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel) n.consumer.Stop() fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel) return nil } } }