fix
zhangqian
2023-12-01 8324f872ef3a4d0c978a9b1d062800c6a1701c12
nsq/producer.go
@@ -4,57 +4,77 @@
   "apsClient/conf"
   "apsClient/pkg/logx"
   "apsClient/pkg/nsqclient"
   "sync/atomic"
)
var producer nsqclient.Producer
var (
   producer nsqclient.Producer
   initFlag int32
)
func GetProducer() nsqclient.Producer {
   return producer
}
func StopProducer() {
   if !atomic.CompareAndSwapInt32(&initFlag, 1, 0) {
      return
   }
   nsqclient.DestroyProducerPool()
}
func initProducer() (err error) {
   if !atomic.CompareAndSwapInt32(&initFlag, 0, 1) {
      return nil
   }
   producer, err = nsqclient.NewProducer(conf.Conf.NsqConf.NsqdAddr)
   if err != nil {
      logx.Errorf("NewProducer err:%v", err)
      return err
   }
   // 测试发布数据
   //go func() {
   //   for {
   //      time.Sleep(time.Second)
   //      _ = producer.Publish("test", []byte("123"))
   //   }
   //}()
   ////测试发布数据
   //go func() {
   //   for {
   //      time.Sleep(time.Second * 2)
   //      err := producer.Publish("aps.wangpengfei.erp.cstReply", []byte("456"))
   //      logx.Infof("=====err:%v", err)
   //      _ = producer.Publish("test", []byte("123"))
   //   }
   //}()
   //
   //go func() {
   //   for {
   //      time.Sleep(time.Second * 5)
   //      applyMaterial := ApplyOrderMaterial{
   //         FBillNo:   "123",
   //         FNumber:   "456",
   //         UseAmount: 1,
   //      }
   //
   //      applyBytes, err := json.Marshal([]*ApplyOrderMaterial{&applyMaterial})
   //      if err != nil {
   //         return
   //      }
   //
   //      producer := GetProducer()
   //      err = producer.Publish(fmt.Sprintf("aps.%v.erp.cstApply", conf.WebConf.NodeId), applyBytes)
   //      logx.Infof("===============ApplyMaterialByProduct topic:%v, applyBytes:%v, err:%v", fmt.Sprintf("aps.%v.erp.cstApply", conf.WebConf.NodeId), string(applyBytes), err)
   //      if err != nil {
   //         return
   //      }
   //   testH := &testHand{}
   //   c, err := nsqclient.NewNsqConsumer(context.Background(), "test", "channel1")
   //   c.AddHandler(testH.HandleMessage)
   //   c.Run(conf.Conf.NsqConf.NsqdAddr, 1)
   //   if err != nil {
   //      return
   //   }
   //}()
   //go func() {
   //   testH := &testHand2{}
   //   c, err := nsqclient.NewNsqConsumer(context.Background(), "test", "channel2")
   //   c.AddHandler(testH.HandleMessage)
   //   c.Run(conf.Conf.NsqConf.NsqdAddr, 1)
   //   if err != nil {
   //      return
   //   }
   //}()
   return nil
}
//// 测试消费数据
//type testHand struct {
//}
//
//func (slf *testHand) HandleMessage(data []byte) (err error) {
//   fmt.Println("testHand1", string(data))
//   return err
//}
//
//type testHand2 struct {
//}
//
//func (slf *testHand2) HandleMessage(data []byte) (err error) {
//   fmt.Println("testHand2", string(data))
//   return err
//}