zhangqian
2023-10-21 93913145516ca22c3c139532457be9cb0b510be5
nsqq启停幂等性操作
2个文件已修改
16 ■■■■■ 已修改文件
nsq/nsq.go 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/producer.go 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/nsq.go
@@ -72,6 +72,9 @@
}
func (c *consumerManager) stop() {
    if !atomic.CompareAndSwapInt32(&c.initFlag, 1, 0) {
        return
    }
    c.clients.Range(func(key, value any) bool {
        if consumer, ok := value.(*nsqclient.NsqConsumer); ok {
            nsqclient.DestroyNsqConsumer(consumer)
nsq/producer.go
@@ -4,19 +4,30 @@
    "apsClient/conf"
    "apsClient/pkg/logx"
    "apsClient/pkg/nsqclient"
    "sync/atomic"
)
var producer nsqclient.Producer
var (
    producer nsqclient.Producer
    initFlag int32
)
func GetProducer() nsqclient.Producer {
    return producer
}
func StopProducer() {
    if !atomic.CompareAndSwapInt32(&initFlag, 1, 0) {
        return
    }
    nsqclient.DestroyProducerPool()
}
func initProducer() (err error) {
    if !atomic.CompareAndSwapInt32(&initFlag, 0, 1) {
        return nil
    }
    producer, err = nsqclient.NewProducer(conf.Conf.NsqConf.NsqdAddr)
    if err != nil {
        logx.Errorf("NewProducer err:%v", err)