From 33bdd93939ee58655939b67faea7356c26d952b6 Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期二, 22 八月 2023 09:28:06 +0800 Subject: [PATCH] 消费消息时channel换成设备id --- nsq/producer.go | 65 +++++++++++++++++--------------- nsq/nsq.go | 2 2 files changed, 36 insertions(+), 31 deletions(-) diff --git a/nsq/nsq.go b/nsq/nsq.go index c4bdcf2..542f31f 100644 --- a/nsq/nsq.go +++ b/nsq/nsq.go @@ -17,7 +17,7 @@ } safe.Go(func() { - _ = Consume(fmt.Sprintf("aps.%v.scheduleTask", conf.Conf.NsqConf.NodeId), "sensor01") + _ = Consume(fmt.Sprintf("aps.%v.scheduleTask", conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId) }) return nil diff --git a/nsq/producer.go b/nsq/producer.go index d7b80b2..516ed17 100644 --- a/nsq/producer.go +++ b/nsq/producer.go @@ -18,43 +18,48 @@ logx.Errorf("NewProducer err:%v", err) return err } - // 娴嬭瘯鍙戝竷鏁版嵁 - //go func() { - // for { - // time.Sleep(time.Second) - // _ = producer.Publish("test", []byte("123")) - // } - //}() - + ////娴嬭瘯鍙戝竷鏁版嵁 //go func() { // for { // time.Sleep(time.Second * 2) - // err := producer.Publish("aps.wangpengfei.erp.cstReply", []byte("456")) - // logx.Infof("=====err:%v", err) + // _ = producer.Publish("test", []byte("123")) // } //}() - + // //go func() { - // for { - // time.Sleep(time.Second * 5) - // applyMaterial := ApplyOrderMaterial{ - // FBillNo: "123", - // FNumber: "456", - // UseAmount: 1, - // } - // - // applyBytes, err := json.Marshal([]*ApplyOrderMaterial{&applyMaterial}) - // if err != nil { - // return - // } - // - // producer := GetProducer() - // err = producer.Publish(fmt.Sprintf("aps.%v.erp.cstApply", conf.WebConf.NodeId), applyBytes) - // logx.Infof("===============ApplyMaterialByProduct topic:%v, applyBytes:%v, err:%v", fmt.Sprintf("aps.%v.erp.cstApply", conf.WebConf.NodeId), string(applyBytes), err) - // if err != nil { - // return - // } + // testH := &testHand{} + // c, err := nsqclient.NewNsqConsumer(context.Background(), "test", "channel1") + // c.AddHandler(testH.HandleMessage) + // c.Run(conf.Conf.NsqConf.NsqdAddr, 1) + // if err != nil { + // return + // } + //}() + //go func() { + // testH := &testHand2{} + // c, err := nsqclient.NewNsqConsumer(context.Background(), "test", "channel2") + // c.AddHandler(testH.HandleMessage) + // c.Run(conf.Conf.NsqConf.NsqdAddr, 1) + // if err != nil { + // return // } //}() return nil } + +//// 娴嬭瘯娑堣垂鏁版嵁 +//type testHand struct { +//} +// +//func (slf *testHand) HandleMessage(data []byte) (err error) { +// fmt.Println("testHand1", string(data)) +// return err +//} +// +//type testHand2 struct { +//} +// +//func (slf *testHand2) HandleMessage(data []byte) (err error) { +// fmt.Println("testHand2", string(data)) +// return err +//} -- Gitblit v1.8.0