| | |
| | | ) |
| | | |
| | | type NsqConsumer struct { |
| | | consumer *nsq.Consumer |
| | | handler nsq.Handler |
| | | consumer *nsq.Consumer |
| | | // handler nsq.Handler |
| | | handler func([]byte) error |
| | | ctx context.Context |
| | | ctxCancel context.CancelFunc |
| | | topic string |
| | |
| | | } |
| | | } |
| | | |
| | | func (n *NsqConsumer) AddHandler(handler nsq.Handler) { |
| | | // func (n *NsqConsumer) AddHandler(handler nsq.Handler) { |
| | | // n.handler = handler |
| | | // } |
| | | |
| | | func (n *NsqConsumer) AddHandler(handler func([]byte) error) { |
| | | n.handler = handler |
| | | } |
| | | |
| | |
| | | |
| | | func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error { |
| | | n.consumer.ChangeMaxInFlight(concurrency) |
| | | n.consumer.AddConcurrentHandlers(n.handler, concurrency) |
| | | // n.consumer.AddConcurrentHandlers(n.handler, concurrency) |
| | | n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error { |
| | | return n.handler(msg.Body) |
| | | // return nil |
| | | }), concurrency) |
| | | |
| | | var err error |
| | | if len(qAddr) > 0 { |