From 4244acad88afc44ae8a34a8fbd9e296780a0cc64 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 16 五月 2023 16:00:36 +0800 Subject: [PATCH] remove github.com nsq depend --- nsqclient/consumer.go | 17 +++++++++++++---- 1 files changed, 13 insertions(+), 4 deletions(-) diff --git a/nsqclient/consumer.go b/nsqclient/consumer.go index 152989d..a0df0b0 100644 --- a/nsqclient/consumer.go +++ b/nsqclient/consumer.go @@ -9,8 +9,9 @@ ) type NsqConsumer struct { - consumer *nsq.Consumer - handler nsq.Handler + consumer *nsq.Consumer + // handler nsq.Handler + handler func([]byte) error ctx context.Context ctxCancel context.CancelFunc topic string @@ -46,7 +47,11 @@ } } -func (n *NsqConsumer) AddHandler(handler nsq.Handler) { +// func (n *NsqConsumer) AddHandler(handler nsq.Handler) { +// n.handler = handler +// } + +func (n *NsqConsumer) AddHandler(handler func([]byte) error) { n.handler = handler } @@ -60,7 +65,11 @@ func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error { n.consumer.ChangeMaxInFlight(concurrency) - n.consumer.AddConcurrentHandlers(n.handler, concurrency) + // n.consumer.AddConcurrentHandlers(n.handler, concurrency) + n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error { + return n.handler(msg.Body) + // return nil + }), concurrency) var err error if len(qAddr) > 0 { -- Gitblit v1.8.0