From e9f6bf26943f3cc974d02a083cdac22af57e8cb4 Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期五, 03 十一月 2023 15:46:57 +0800
Subject: [PATCH] 去掉换行
---
nsq/producer.go | 82 +++++++++++++++++++++++++---------------
1 files changed, 51 insertions(+), 31 deletions(-)
diff --git a/nsq/producer.go b/nsq/producer.go
index d7b80b2..ade2e2f 100644
--- a/nsq/producer.go
+++ b/nsq/producer.go
@@ -4,57 +4,77 @@
"apsClient/conf"
"apsClient/pkg/logx"
"apsClient/pkg/nsqclient"
+ "sync/atomic"
)
-var producer nsqclient.Producer
+var (
+ producer nsqclient.Producer
+ initFlag int32
+)
func GetProducer() nsqclient.Producer {
return producer
}
+func StopProducer() {
+ if !atomic.CompareAndSwapInt32(&initFlag, 1, 0) {
+ return
+ }
+ nsqclient.DestroyProducerPool()
+}
+
func initProducer() (err error) {
+ if !atomic.CompareAndSwapInt32(&initFlag, 0, 1) {
+ return nil
+ }
+
producer, err = nsqclient.NewProducer(conf.Conf.NsqConf.NsqdAddr)
if err != nil {
logx.Errorf("NewProducer err:%v", err)
return err
}
- // 娴嬭瘯鍙戝竷鏁版嵁
- //go func() {
- // for {
- // time.Sleep(time.Second)
- // _ = producer.Publish("test", []byte("123"))
- // }
- //}()
-
+ ////娴嬭瘯鍙戝竷鏁版嵁
//go func() {
// for {
// time.Sleep(time.Second * 2)
- // err := producer.Publish("aps.wangpengfei.erp.cstReply", []byte("456"))
- // logx.Infof("=====err:%v", err)
+ // _ = producer.Publish("test", []byte("123"))
// }
//}()
-
+ //
//go func() {
- // for {
- // time.Sleep(time.Second * 5)
- // applyMaterial := ApplyOrderMaterial{
- // FBillNo: "123",
- // FNumber: "456",
- // UseAmount: 1,
- // }
- //
- // applyBytes, err := json.Marshal([]*ApplyOrderMaterial{&applyMaterial})
- // if err != nil {
- // return
- // }
- //
- // producer := GetProducer()
- // err = producer.Publish(fmt.Sprintf("aps.%v.erp.cstApply", conf.WebConf.NodeId), applyBytes)
- // logx.Infof("===============ApplyMaterialByProduct topic:%v, applyBytes:%v, err:%v", fmt.Sprintf("aps.%v.erp.cstApply", conf.WebConf.NodeId), string(applyBytes), err)
- // if err != nil {
- // return
- // }
+ // testH := &testHand{}
+ // c, err := nsqclient.NewNsqConsumer(context.Background(), "test", "channel1")
+ // c.AddHandler(testH.HandleMessage)
+ // c.Run(conf.Conf.NsqConf.NsqdAddr, 1)
+ // if err != nil {
+ // return
+ // }
+ //}()
+ //go func() {
+ // testH := &testHand2{}
+ // c, err := nsqclient.NewNsqConsumer(context.Background(), "test", "channel2")
+ // c.AddHandler(testH.HandleMessage)
+ // c.Run(conf.Conf.NsqConf.NsqdAddr, 1)
+ // if err != nil {
+ // return
// }
//}()
return nil
}
+
+//// 娴嬭瘯娑堣垂鏁版嵁
+//type testHand struct {
+//}
+//
+//func (slf *testHand) HandleMessage(data []byte) (err error) {
+// fmt.Println("testHand1", string(data))
+// return err
+//}
+//
+//type testHand2 struct {
+//}
+//
+//func (slf *testHand2) HandleMessage(data []byte) (err error) {
+// fmt.Println("testHand2", string(data))
+// return err
+//}
--
Gitblit v1.8.0