| | |
| | | nsqclient.DestroyNsqConsumer(consumer) |
| | | logx.Infof("try stop consumer, topic : %v", key) |
| | | consumer = nil |
| | | c.clients.Delete(key) |
| | | } |
| | | |
| | | return true |
| | | }) |
| | | } |
| | |
| | | for { |
| | | select { |
| | | case <-n.ctx.Done(): |
| | | logx.Infof("[%s]%s stop consumer...", n.topic, n.channel) |
| | | logx.Infof("[%s] stop consumer...", n.topic) |
| | | n.consumer.Stop() |
| | | <-n.consumer.StopChan |
| | | logx.Infof("[%s]%s stop consumer success", n.topic, n.channel) |
| | | logx.Infof("[%s] stop consumer success", n.topic) |
| | | for _, addr := range qAddr { |
| | | err = n.consumer.DisconnectFromNSQD(addr) |
| | | if err != nil { |
| | | logx.Errorf("disconnect from nsq server failed, err: %v, addr: %v, topic: %v", err, addr, n.topic) |
| | | } else { |
| | | logx.Infof("disconnect from nsq server success, addr: %v, topic: %v", addr, n.topic) |
| | | } |
| | | } |
| | | return nil |
| | | } |
| | | } |