fix
zhangqian
2023-10-30 10c65da3d2af7056f48d9301e83f53f102f76e18
nsq/nsq.go
@@ -10,11 +10,13 @@
   "errors"
   "fmt"
   "sync"
   "sync/atomic"
)
type consumerManager struct {
   ctx     context.Context
   clients sync.Map
   ctx      context.Context
   clients  sync.Map
   initFlag int32
}
var defaultConsumerManager *consumerManager
@@ -27,12 +29,14 @@
   if len(conf.Conf.NsqConf.NodeId) <= 0 {
      return errors.New("no NodeId")
   }
   if !atomic.CompareAndSwapInt32(&c.initFlag, 0, 1) {
      return nil
   }
   if err := initProducer(); err != nil {
      return err
   }
   var topics = []string{
      constvar.NsqTopicScheduleTask,
      constvar.NsqTopicSendPlcAddress,
      constvar.NsqTopicApsProcessParams,
      constvar.NsqTopicDeviceUpdate,
      constvar.NsqTopicPullDataResponse,
@@ -67,6 +71,9 @@
}
func (c *consumerManager) stop() {
   if !atomic.CompareAndSwapInt32(&c.initFlag, 1, 0) {
      return
   }
   c.clients.Range(func(key, value any) bool {
      if consumer, ok := value.(*nsqclient.NsqConsumer); ok {
         nsqclient.DestroyNsqConsumer(consumer)