zhangqian
2023-10-20 7276ab65576ec73b439a40d7f1a3035a534b968c
停止消费时关闭tcp连接
2个文件已修改
14 ■■■■ 已修改文件
nsq/nsq.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
pkg/nsqclient/consumer.go 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/nsq.go
@@ -73,8 +73,8 @@
            nsqclient.DestroyNsqConsumer(consumer)
            logx.Infof("try stop consumer, topic : %v", key)
            consumer = nil
            c.clients.Delete(key)
        }
        return true
    })
}
pkg/nsqclient/consumer.go
@@ -90,10 +90,18 @@
    for {
        select {
        case <-n.ctx.Done():
            logx.Infof("[%s]%s stop consumer...", n.topic, n.channel)
            logx.Infof("[%s] stop consumer...", n.topic)
            n.consumer.Stop()
            <-n.consumer.StopChan
            logx.Infof("[%s]%s stop consumer success", n.topic, n.channel)
            logx.Infof("[%s] stop consumer success", n.topic)
            for _, addr := range qAddr {
                err = n.consumer.DisconnectFromNSQD(addr)
                if err != nil {
                    logx.Errorf("disconnect from nsq server failed, err: %v, addr: %v, topic: %v", err, addr, n.topic)
                } else {
                    logx.Infof("disconnect from nsq server success, addr: %v, topic: %v", addr, n.topic)
                }
            }
            return nil
        }
    }