package nsqclient import ( "apsClient/pkg/logx" "context" "fmt" "time" nsq "github.com/nsqio/go-nsq" ) type NsqConsumer struct { consumer *nsq.Consumer // handler nsq.Handler handler func([]byte) error ctx context.Context ctxCancel context.CancelFunc topic string channel string } func NewNsqConsumer(topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) { conf := nsq.NewConfig() conf.MaxAttempts = 0 conf.MsgTimeout = 10 * time.Minute // 默认一个消息最多能处理十分钟,否则就会重新丢入队列 conf.LookupdPollInterval = 3 * time.Second // 调整consumer的重连间隔时间为3秒 for _, option := range options { option(conf) } consumer, err := nsq.NewConsumer(topic, channel, conf) if err != nil { return nil, err } return &NsqConsumer{ consumer: consumer, topic: topic, channel: channel, }, nil } func DestroyNsqConsumer(c *NsqConsumer) { if c != nil { if c.ctxCancel != nil { c.ctxCancel() } } } // func (n *NsqConsumer) AddHandler(handler nsq.Handler) { // n.handler = handler // } func (n *NsqConsumer) AddHandler(handler func([]byte) error) { n.handler = handler } func (n *NsqConsumer) Run(qaddr string, concurrency int) error { return n.RunDistributed([]string{qaddr}, nil, concurrency) } func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error { return n.RunDistributed(nil, []string{lookupAddr}, concurrency) } func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error { n.consumer.ChangeMaxInFlight(concurrency) // n.consumer.AddConcurrentHandlers(n.handler, concurrency) n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error { return n.handler(msg.Body) // return nil }), concurrency) var err error if len(qAddr) > 0 { err = n.consumer.ConnectToNSQDs(qAddr) } else if len(lAddr) > 0 { err = n.consumer.ConnectToNSQLookupds(lAddr) } else { err = fmt.Errorf("Addr Must NOT Empty") } if err != nil { return err } if n.ctx == nil { n.ctx, n.ctxCancel = context.WithCancel(context.Background()) } for { select { case <-n.ctx.Done(): logx.Infof("[%s] stop consumer...", n.topic) for _, addr := range qAddr { err = n.consumer.DisconnectFromNSQD(addr) if err != nil { logx.Errorf("disconnect from nsq server failed, err: %v, addr: %v, topic: %v", err, addr, n.topic) } else { logx.Infof("disconnect from nsq server success, addr: %v, topic: %v", addr, n.topic) } } n.consumer.Stop() <-n.consumer.StopChan logx.Infof("[%s] stop consumer success", n.topic) return nil } } }