zhangqian
2023-10-09 41a9bf370cff977ff8afe122a7610e07fa6c3b80
nsq/nsq.go
@@ -6,20 +6,32 @@
   "apsClient/model/common"
   "apsClient/pkg/logx"
   "apsClient/pkg/safe"
   "basic.com/aps/nsqclient.git"
   "context"
   "errors"
   "fmt"
   "sync"
   "time"
)
func Init() error {
type consumerManager struct {
   ctx     context.Context
   clients sync.Map
}
var defaultConsumerManager *consumerManager
func init() {
   defaultConsumerManager = new(consumerManager)
}
func (c *consumerManager) init() error {
   if len(conf.Conf.NsqConf.NodeId) <= 0 {
      return errors.New("no NodeId")
   }
   if err := initProducer(); err != nil {
      return err
   }
   safe.Go(func() {
      caller := NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId))
      var addressResult common.ResponsePlcAddress
@@ -29,40 +41,54 @@
      }
   })
   safe.Go(func() {
      err := Consume(fmt.Sprintf(constvar.NsqTopicScheduleTask, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
      if err != nil {
         logx.Errorf("start nsq consume err: %v", err)
      }
   })
   safe.Go(func() {
      err := Consume(fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
      if err != nil {
         logx.Errorf("start nsq consume err: %v", err)
      }
   })
   safe.Go(func() {
      err := Consume(fmt.Sprintf(constvar.NsqTopicProcessParamsResponse, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
      if err != nil {
         logx.Errorf("start nsq consume err: %v", err)
      }
   })
   safe.Go(func() {
      err := Consume(fmt.Sprintf(constvar.NsqTopicApsProcessParams, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
      if err != nil {
         logx.Errorf("start nsq consume err: %v", err)
      }
   })
   safe.Go(func() {
      err := Consume(fmt.Sprintf(constvar.NsqTopicDeviceUpdate, conf.Conf.NsqConf.NodeId), conf.Conf.System.DeviceId)
      if err != nil {
         logx.Errorf("start nsq consume err: %v", err)
      }
   })
   var topics = []string{
      constvar.NsqTopicScheduleTask,
      constvar.NsqTopicSendPlcAddress,
      constvar.NsqTopicProcessParamsResponse,
      constvar.NsqTopicApsProcessParams,
      constvar.NsqTopicDeviceUpdate,
   }
   for _, t := range topics {
      topic := fmt.Sprintf(t, conf.Conf.NsqConf.NodeId)
      c.AddConsumer(topic)
   }
   return nil
}
func (c *consumerManager) AddConsumer(topic string) {
   client, err := NewConsumer(topic, conf.Conf.System.DeviceId)
   if err != nil {
      logx.Errorf("start nsq consume err: %v", err)
   }
   c.clients.Store(topic, client)
   safe.Go(func() {
      if len(conf.Conf.NsqConf.NsqlookupdAddr) > 0 {
         if err = client.RunLookupd(conf.Conf.NsqConf.NsqlookupdAddr, 1); err != nil {
            logx.Errorf("RunLookupd err:%v", err)
            return
         }
      } else {
         if err = client.Run(conf.Conf.NsqConf.NsqdAddr, 1); err != nil {
            logx.Errorf("Run err:%v", err)
            return
         }
      }
   })
}
func (c *consumerManager) stop() {
   c.clients.Range(func(key, value any) bool {
      if consumer, ok := value.(*nsqclient.NsqConsumer); ok {
         nsqclient.DestroyNsqConsumer(consumer)
      }
      return true
   })
}
func Init() error {
   return defaultConsumerManager.init()
}
func Stop() {
   defaultConsumerManager.stop()
}