package nsq import ( "apsClient/conf" "apsClient/constvar" "apsClient/pkg/logx" "apsClient/pkg/nsqclient" "apsClient/pkg/safe" "context" "errors" "fmt" "github.com/shirou/gopsutil/net" "sync" "sync/atomic" ) type consumerManager struct { ctx context.Context clients sync.Map initFlag int32 } var defaultConsumerManager *consumerManager func init() { defaultConsumerManager = new(consumerManager) } func (c *consumerManager) init() error { if len(conf.Conf.NsqConf.NodeId) <= 0 { return errors.New("no NodeId") } if !atomic.CompareAndSwapInt32(&c.initFlag, 0, 1) { return nil } if err := initProducer(); err != nil { return err } var topics = []string{ constvar.NsqTopicScheduleTask, constvar.NsqTopicApsProcessParams, constvar.NsqTopicDeviceUpdate, constvar.NsqTopicPullDataResponse, } for _, t := range topics { topic := fmt.Sprintf(t, conf.Conf.NsqConf.NodeId) c.AddConsumer(topic) } return nil } func (c *consumerManager) AddConsumer(topic string) { safe.Go(func() { client, err := NewConsumer(topic, conf.Conf.System.DeviceId) if err != nil { logx.Errorf("start nsq consume err: %v", err) } c.clients.Store(topic, client) logx.Infof("add consumer success, topic:%v", topic) if len(conf.Conf.NsqConf.NsqlookupdAddr) > 0 { if err = client.RunLookupd(conf.Conf.NsqConf.NsqlookupdAddr, 1); err != nil { logx.Errorf("RunLookupd err:%v", err) return } } else { if err = client.Run(conf.Conf.NsqConf.NsqdAddr, 1); err != nil { logx.Errorf("Run err:%v", err) return } } }) } func (c *consumerManager) stop() { if !atomic.CompareAndSwapInt32(&c.initFlag, 1, 0) { return } c.clients.Range(func(key, value any) bool { if consumer, ok := value.(*nsqclient.NsqConsumer); ok { nsqclient.DestroyNsqConsumer(consumer) logx.Infof("try stop consumer, topic : %v", key) consumer = nil c.clients.Delete(key) } return true }) } func (c *consumerManager) ping() bool { connections, err := net.Connections("inet") if err != nil { fmt.Println("Error:", err) return false } for _, conn := range connections { ipPort := fmt.Sprintf("%s:%d", conn.Raddr.IP, conn.Raddr.Port) fmt.Println("net.Connections ipPort", ipPort) if ipPort == conf.Conf.NsqConf.NsqdAddr && conn.Status == "ESTABLISHED" { return true } } return false } func Init() error { return defaultConsumerManager.init() } func Ping() bool { return defaultConsumerManager.ping() } func Stop() { defaultConsumerManager.stop() StopProducer() }