package nsq import ( "apsClient/conf" "apsClient/pkg/logx" "apsClient/pkg/nsqclient" "sync/atomic" ) var ( producer nsqclient.Producer initFlag int32 ) func GetProducer() nsqclient.Producer { return producer } func StopProducer() { if !atomic.CompareAndSwapInt32(&initFlag, 1, 0) { return } nsqclient.DestroyProducerPool() } func initProducer() (err error) { if !atomic.CompareAndSwapInt32(&initFlag, 0, 1) { return nil } producer, err = nsqclient.NewProducer(conf.Conf.NsqConf.NsqdAddr) if err != nil { logx.Errorf("NewProducer err:%v", err) return err } ////测试发布数据 //go func() { // for { // time.Sleep(time.Second * 2) // _ = producer.Publish("test", []byte("123")) // } //}() // //go func() { // testH := &testHand{} // c, err := nsqclient.NewNsqConsumer(context.Background(), "test", "channel1") // c.AddHandler(testH.HandleMessage) // c.Run(conf.Conf.NsqConf.NsqdAddr, 1) // if err != nil { // return // } //}() //go func() { // testH := &testHand2{} // c, err := nsqclient.NewNsqConsumer(context.Background(), "test", "channel2") // c.AddHandler(testH.HandleMessage) // c.Run(conf.Conf.NsqConf.NsqdAddr, 1) // if err != nil { // return // } //}() return nil } //// 测试消费数据 //type testHand struct { //} // //func (slf *testHand) HandleMessage(data []byte) (err error) { // fmt.Println("testHand1", string(data)) // return err //} // //type testHand2 struct { //} // //func (slf *testHand2) HandleMessage(data []byte) (err error) { // fmt.Println("testHand2", string(data)) // return err //}