| | |
| | | } |
| | | |
| | | func (c *consumerManager) stop() { |
| | | if !atomic.CompareAndSwapInt32(&c.initFlag, 1, 0) { |
| | | return |
| | | } |
| | | c.clients.Range(func(key, value any) bool { |
| | | if consumer, ok := value.(*nsqclient.NsqConsumer); ok { |
| | | nsqclient.DestroyNsqConsumer(consumer) |
| | |
| | | "apsClient/conf" |
| | | "apsClient/pkg/logx" |
| | | "apsClient/pkg/nsqclient" |
| | | "sync/atomic" |
| | | ) |
| | | |
| | | var producer nsqclient.Producer |
| | | var ( |
| | | producer nsqclient.Producer |
| | | initFlag int32 |
| | | ) |
| | | |
| | | func GetProducer() nsqclient.Producer { |
| | | return producer |
| | | } |
| | | |
| | | func StopProducer() { |
| | | if !atomic.CompareAndSwapInt32(&initFlag, 1, 0) { |
| | | return |
| | | } |
| | | nsqclient.DestroyProducerPool() |
| | | } |
| | | |
| | | func initProducer() (err error) { |
| | | if !atomic.CompareAndSwapInt32(&initFlag, 0, 1) { |
| | | return nil |
| | | } |
| | | |
| | | producer, err = nsqclient.NewProducer(conf.Conf.NsqConf.NsqdAddr) |
| | | if err != nil { |
| | | logx.Errorf("NewProducer err:%v", err) |