From e9f6bf26943f3cc974d02a083cdac22af57e8cb4 Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期五, 03 十一月 2023 15:46:57 +0800
Subject: [PATCH] 去掉换行

---
 nsq/producer.go |   82 +++++++++++++++++++++++++---------------
 1 files changed, 51 insertions(+), 31 deletions(-)

diff --git a/nsq/producer.go b/nsq/producer.go
index d7b80b2..ade2e2f 100644
--- a/nsq/producer.go
+++ b/nsq/producer.go
@@ -4,57 +4,77 @@
 	"apsClient/conf"
 	"apsClient/pkg/logx"
 	"apsClient/pkg/nsqclient"
+	"sync/atomic"
 )
 
-var producer nsqclient.Producer
+var (
+	producer nsqclient.Producer
+	initFlag int32
+)
 
 func GetProducer() nsqclient.Producer {
 	return producer
 }
 
+func StopProducer() {
+	if !atomic.CompareAndSwapInt32(&initFlag, 1, 0) {
+		return
+	}
+	nsqclient.DestroyProducerPool()
+}
+
 func initProducer() (err error) {
+	if !atomic.CompareAndSwapInt32(&initFlag, 0, 1) {
+		return nil
+	}
+
 	producer, err = nsqclient.NewProducer(conf.Conf.NsqConf.NsqdAddr)
 	if err != nil {
 		logx.Errorf("NewProducer err:%v", err)
 		return err
 	}
-	// 娴嬭瘯鍙戝竷鏁版嵁
-	//go func() {
-	//	for {
-	//		time.Sleep(time.Second)
-	//		_ = producer.Publish("test", []byte("123"))
-	//	}
-	//}()
-
+	////娴嬭瘯鍙戝竷鏁版嵁
 	//go func() {
 	//	for {
 	//		time.Sleep(time.Second * 2)
-	//		err := producer.Publish("aps.wangpengfei.erp.cstReply", []byte("456"))
-	//		logx.Infof("=====err:%v", err)
+	//		_ = producer.Publish("test", []byte("123"))
 	//	}
 	//}()
-
+	//
 	//go func() {
-	//	for {
-	//		time.Sleep(time.Second * 5)
-	//		applyMaterial := ApplyOrderMaterial{
-	//			FBillNo:   "123",
-	//			FNumber:   "456",
-	//			UseAmount: 1,
-	//		}
-	//
-	//		applyBytes, err := json.Marshal([]*ApplyOrderMaterial{&applyMaterial})
-	//		if err != nil {
-	//			return
-	//		}
-	//
-	//		producer := GetProducer()
-	//		err = producer.Publish(fmt.Sprintf("aps.%v.erp.cstApply", conf.WebConf.NodeId), applyBytes)
-	//		logx.Infof("===============ApplyMaterialByProduct topic:%v, applyBytes:%v, err:%v", fmt.Sprintf("aps.%v.erp.cstApply", conf.WebConf.NodeId), string(applyBytes), err)
-	//		if err != nil {
-	//			return
-	//		}
+	//	testH := &testHand{}
+	//	c, err := nsqclient.NewNsqConsumer(context.Background(), "test", "channel1")
+	//	c.AddHandler(testH.HandleMessage)
+	//	c.Run(conf.Conf.NsqConf.NsqdAddr, 1)
+	//	if err != nil {
+	//		return
+	//	}
+	//}()
+	//go func() {
+	//	testH := &testHand2{}
+	//	c, err := nsqclient.NewNsqConsumer(context.Background(), "test", "channel2")
+	//	c.AddHandler(testH.HandleMessage)
+	//	c.Run(conf.Conf.NsqConf.NsqdAddr, 1)
+	//	if err != nil {
+	//		return
 	//	}
 	//}()
 	return nil
 }
+
+//// 娴嬭瘯娑堣垂鏁版嵁
+//type testHand struct {
+//}
+//
+//func (slf *testHand) HandleMessage(data []byte) (err error) {
+//	fmt.Println("testHand1", string(data))
+//	return err
+//}
+//
+//type testHand2 struct {
+//}
+//
+//func (slf *testHand2) HandleMessage(data []byte) (err error) {
+//	fmt.Println("testHand2", string(data))
+//	return err
+//}

--
Gitblit v1.8.0