package nsq import ( "apsClient/conf" "apsClient/constvar" "apsClient/model/common" "apsClient/pkg/logx" "apsClient/pkg/safe" "basic.com/aps/nsqclient.git" "context" "errors" "fmt" "sync" "time" ) type consumerManager struct { ctx context.Context clients sync.Map } var defaultConsumerManager *consumerManager func init() { defaultConsumerManager = new(consumerManager) } func (c *consumerManager) init() error { if len(conf.Conf.NsqConf.NodeId) <= 0 { return errors.New("no NodeId") } if err := initProducer(); err != nil { return err } safe.Go(func() { caller := NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId)) var addressResult common.ResponsePlcAddress err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*3) if err != nil { logx.Infof("get plc address err: %v", err.Error()) } }) var topics = []string{ constvar.NsqTopicScheduleTask, constvar.NsqTopicSendPlcAddress, constvar.NsqTopicProcessParamsResponse, constvar.NsqTopicApsProcessParams, constvar.NsqTopicDeviceUpdate, } for _, t := range topics { topic := fmt.Sprintf(t, conf.Conf.NsqConf.NodeId) c.AddConsumer(topic) } return nil } func (c *consumerManager) AddConsumer(topic string) { client, err := NewConsumer(topic, conf.Conf.System.DeviceId) if err != nil { logx.Errorf("start nsq consume err: %v", err) } c.clients.Store(topic, client) safe.Go(func() { if len(conf.Conf.NsqConf.NsqlookupdAddr) > 0 { if err = client.RunLookupd(conf.Conf.NsqConf.NsqlookupdAddr, 1); err != nil { logx.Errorf("RunLookupd err:%v", err) return } } else { if err = client.Run(conf.Conf.NsqConf.NsqdAddr, 1); err != nil { logx.Errorf("Run err:%v", err) return } } }) } func (c *consumerManager) stop() { c.clients.Range(func(key, value any) bool { if consumer, ok := value.(*nsqclient.NsqConsumer); ok { nsqclient.DestroyNsqConsumer(consumer) } return true }) } func Init() error { return defaultConsumerManager.init() } func Stop() { defaultConsumerManager.stop() }