package nsq import ( "apsClient/conf" "apsClient/constvar" "apsClient/pkg/logx" "apsClient/pkg/nsqclient" "apsClient/pkg/safe" "context" "errors" "fmt" "sync" ) type consumerManager struct { ctx context.Context clients sync.Map } var defaultConsumerManager *consumerManager func init() { defaultConsumerManager = new(consumerManager) } func (c *consumerManager) init() error { if len(conf.Conf.NsqConf.NodeId) <= 0 { return errors.New("no NodeId") } if err := initProducer(); err != nil { return err } var topics = []string{ constvar.NsqTopicScheduleTask, constvar.NsqTopicSendPlcAddress, constvar.NsqTopicProcessParamsResponse, constvar.NsqTopicApsProcessParams, constvar.NsqTopicDeviceUpdate, constvar.NsqTopicPullDataResponse, } for _, t := range topics { topic := fmt.Sprintf(t, conf.Conf.NsqConf.NodeId) c.AddConsumer(topic) } return nil } func (c *consumerManager) AddConsumer(topic string) { safe.Go(func() { client, err := NewConsumer(topic, conf.Conf.System.DeviceId) if err != nil { logx.Errorf("start nsq consume err: %v", err) } c.clients.Store(topic, client) if len(conf.Conf.NsqConf.NsqlookupdAddr) > 0 { if err = client.RunLookupd(conf.Conf.NsqConf.NsqlookupdAddr, 1); err != nil { logx.Errorf("RunLookupd err:%v", err) return } } else { if err = client.Run(conf.Conf.NsqConf.NsqdAddr, 1); err != nil { logx.Errorf("Run err:%v", err) return } } logx.Infof("add consumer success, topic:%v", topic) }) } func (c *consumerManager) stop() { c.clients.Range(func(key, value any) bool { if consumer, ok := value.(*nsqclient.NsqConsumer); ok { nsqclient.DestroyNsqConsumer(consumer) logx.Infof("try stop consumer, topic : %v", key) } return true }) } func Init() error { return defaultConsumerManager.init() } func Stop() { defaultConsumerManager.stop() }