From 33bdd93939ee58655939b67faea7356c26d952b6 Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期二, 22 八月 2023 09:28:06 +0800
Subject: [PATCH] 消费消息时channel换成设备id
---
nsq/producer.go | 65 +++++++++++++++++---------------
nsq/nsq.go | 2
2 files changed, 36 insertions(+), 31 deletions(-)
diff --git a/nsq/nsq.go b/nsq/nsq.go
index c4bdcf2..542f31f 100644
--- a/nsq/nsq.go
+++ b/nsq/nsq.go
@@ -17,7 +17,7 @@
}
safe.Go(func() {
- _ = Consume(fmt.Sprintf("aps.%v.scheduleTask", conf.Conf.NsqConf.NodeId), "sensor01")
+ _ = Consume(fmt.Sprintf("aps.%v.scheduleTask", conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
})
return nil
diff --git a/nsq/producer.go b/nsq/producer.go
index d7b80b2..516ed17 100644
--- a/nsq/producer.go
+++ b/nsq/producer.go
@@ -18,43 +18,48 @@
logx.Errorf("NewProducer err:%v", err)
return err
}
- // 娴嬭瘯鍙戝竷鏁版嵁
- //go func() {
- // for {
- // time.Sleep(time.Second)
- // _ = producer.Publish("test", []byte("123"))
- // }
- //}()
-
+ ////娴嬭瘯鍙戝竷鏁版嵁
//go func() {
// for {
// time.Sleep(time.Second * 2)
- // err := producer.Publish("aps.wangpengfei.erp.cstReply", []byte("456"))
- // logx.Infof("=====err:%v", err)
+ // _ = producer.Publish("test", []byte("123"))
// }
//}()
-
+ //
//go func() {
- // for {
- // time.Sleep(time.Second * 5)
- // applyMaterial := ApplyOrderMaterial{
- // FBillNo: "123",
- // FNumber: "456",
- // UseAmount: 1,
- // }
- //
- // applyBytes, err := json.Marshal([]*ApplyOrderMaterial{&applyMaterial})
- // if err != nil {
- // return
- // }
- //
- // producer := GetProducer()
- // err = producer.Publish(fmt.Sprintf("aps.%v.erp.cstApply", conf.WebConf.NodeId), applyBytes)
- // logx.Infof("===============ApplyMaterialByProduct topic:%v, applyBytes:%v, err:%v", fmt.Sprintf("aps.%v.erp.cstApply", conf.WebConf.NodeId), string(applyBytes), err)
- // if err != nil {
- // return
- // }
+ // testH := &testHand{}
+ // c, err := nsqclient.NewNsqConsumer(context.Background(), "test", "channel1")
+ // c.AddHandler(testH.HandleMessage)
+ // c.Run(conf.Conf.NsqConf.NsqdAddr, 1)
+ // if err != nil {
+ // return
+ // }
+ //}()
+ //go func() {
+ // testH := &testHand2{}
+ // c, err := nsqclient.NewNsqConsumer(context.Background(), "test", "channel2")
+ // c.AddHandler(testH.HandleMessage)
+ // c.Run(conf.Conf.NsqConf.NsqdAddr, 1)
+ // if err != nil {
+ // return
// }
//}()
return nil
}
+
+//// 娴嬭瘯娑堣垂鏁版嵁
+//type testHand struct {
+//}
+//
+//func (slf *testHand) HandleMessage(data []byte) (err error) {
+// fmt.Println("testHand1", string(data))
+// return err
+//}
+//
+//type testHand2 struct {
+//}
+//
+//func (slf *testHand2) HandleMessage(data []byte) (err error) {
+// fmt.Println("testHand2", string(data))
+// return err
+//}
--
Gitblit v1.8.0