zhangmeng
2023-05-16 4244acad88afc44ae8a34a8fbd9e296780a0cc64
nsqclient/consumer.go
@@ -9,8 +9,9 @@
)
type NsqConsumer struct {
   consumer  *nsq.Consumer
   handler   nsq.Handler
   consumer *nsq.Consumer
   // handler   nsq.Handler
   handler   func([]byte) error
   ctx       context.Context
   ctxCancel context.CancelFunc
   topic     string
@@ -46,7 +47,11 @@
   }
}
func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
// func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
//    n.handler = handler
// }
func (n *NsqConsumer) AddHandler(handler func([]byte) error) {
   n.handler = handler
}
@@ -60,7 +65,11 @@
func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error {
   n.consumer.ChangeMaxInFlight(concurrency)
   n.consumer.AddConcurrentHandlers(n.handler, concurrency)
   // n.consumer.AddConcurrentHandlers(n.handler, concurrency)
   n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error {
      return n.handler(msg.Body)
      // return nil
   }), concurrency)
   var err error
   if len(qAddr) > 0 {