zhangqian
2023-08-22 33bdd93939ee58655939b67faea7356c26d952b6
消费消息时channel换成设备id
2个文件已修改
67 ■■■■ 已修改文件
nsq/nsq.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/producer.go 65 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/nsq.go
@@ -17,7 +17,7 @@
    }
    safe.Go(func() {
        _ = Consume(fmt.Sprintf("aps.%v.scheduleTask", conf.Conf.NsqConf.NodeId), "sensor01")
        _ = Consume(fmt.Sprintf("aps.%v.scheduleTask", conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
    })
    return nil
nsq/producer.go
@@ -18,43 +18,48 @@
        logx.Errorf("NewProducer err:%v", err)
        return err
    }
    // 测试发布数据
    //go func() {
    //    for {
    //        time.Sleep(time.Second)
    //        _ = producer.Publish("test", []byte("123"))
    //    }
    //}()
    ////测试发布数据
    //go func() {
    //    for {
    //        time.Sleep(time.Second * 2)
    //        err := producer.Publish("aps.wangpengfei.erp.cstReply", []byte("456"))
    //        logx.Infof("=====err:%v", err)
    //        _ = producer.Publish("test", []byte("123"))
    //    }
    //}()
    //
    //go func() {
    //    for {
    //        time.Sleep(time.Second * 5)
    //        applyMaterial := ApplyOrderMaterial{
    //            FBillNo:   "123",
    //            FNumber:   "456",
    //            UseAmount: 1,
    //        }
    //
    //        applyBytes, err := json.Marshal([]*ApplyOrderMaterial{&applyMaterial})
    //        if err != nil {
    //            return
    //        }
    //
    //        producer := GetProducer()
    //        err = producer.Publish(fmt.Sprintf("aps.%v.erp.cstApply", conf.WebConf.NodeId), applyBytes)
    //        logx.Infof("===============ApplyMaterialByProduct topic:%v, applyBytes:%v, err:%v", fmt.Sprintf("aps.%v.erp.cstApply", conf.WebConf.NodeId), string(applyBytes), err)
    //        if err != nil {
    //            return
    //        }
    //    testH := &testHand{}
    //    c, err := nsqclient.NewNsqConsumer(context.Background(), "test", "channel1")
    //    c.AddHandler(testH.HandleMessage)
    //    c.Run(conf.Conf.NsqConf.NsqdAddr, 1)
    //    if err != nil {
    //        return
    //    }
    //}()
    //go func() {
    //    testH := &testHand2{}
    //    c, err := nsqclient.NewNsqConsumer(context.Background(), "test", "channel2")
    //    c.AddHandler(testH.HandleMessage)
    //    c.Run(conf.Conf.NsqConf.NsqdAddr, 1)
    //    if err != nil {
    //        return
    //    }
    //}()
    return nil
}
//// 测试消费数据
//type testHand struct {
//}
//
//func (slf *testHand) HandleMessage(data []byte) (err error) {
//    fmt.Println("testHand1", string(data))
//    return err
//}
//
//type testHand2 struct {
//}
//
//func (slf *testHand2) HandleMessage(data []byte) (err error) {
//    fmt.Println("testHand2", string(data))
//    return err
//}