From 93913145516ca22c3c139532457be9cb0b510be5 Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期六, 21 十月 2023 14:50:45 +0800
Subject: [PATCH] nsqq启停幂等性操作

---
 nsq/producer.go |   13 ++++++++++++-
 nsq/nsq.go      |    3 +++
 2 files changed, 15 insertions(+), 1 deletions(-)

diff --git a/nsq/nsq.go b/nsq/nsq.go
index 847da9a..c8f7650 100644
--- a/nsq/nsq.go
+++ b/nsq/nsq.go
@@ -72,6 +72,9 @@
 }
 
 func (c *consumerManager) stop() {
+	if !atomic.CompareAndSwapInt32(&c.initFlag, 1, 0) {
+		return
+	}
 	c.clients.Range(func(key, value any) bool {
 		if consumer, ok := value.(*nsqclient.NsqConsumer); ok {
 			nsqclient.DestroyNsqConsumer(consumer)
diff --git a/nsq/producer.go b/nsq/producer.go
index 2f1e947..ade2e2f 100644
--- a/nsq/producer.go
+++ b/nsq/producer.go
@@ -4,19 +4,30 @@
 	"apsClient/conf"
 	"apsClient/pkg/logx"
 	"apsClient/pkg/nsqclient"
+	"sync/atomic"
 )
 
-var producer nsqclient.Producer
+var (
+	producer nsqclient.Producer
+	initFlag int32
+)
 
 func GetProducer() nsqclient.Producer {
 	return producer
 }
 
 func StopProducer() {
+	if !atomic.CompareAndSwapInt32(&initFlag, 1, 0) {
+		return
+	}
 	nsqclient.DestroyProducerPool()
 }
 
 func initProducer() (err error) {
+	if !atomic.CompareAndSwapInt32(&initFlag, 0, 1) {
+		return nil
+	}
+
 	producer, err = nsqclient.NewProducer(conf.Conf.NsqConf.NsqdAddr)
 	if err != nil {
 		logx.Errorf("NewProducer err:%v", err)

--
Gitblit v1.8.0