| | |
| | | "context" |
| | | "errors" |
| | | "fmt" |
| | | "github.com/shirou/gopsutil/net" |
| | | "sync" |
| | | "sync/atomic" |
| | | ) |
| | | |
| | | type consumerManager struct { |
| | | ctx context.Context |
| | | clients sync.Map |
| | | ctx context.Context |
| | | clients sync.Map |
| | | initFlag int32 |
| | | } |
| | | |
| | | var defaultConsumerManager *consumerManager |
| | |
| | | if len(conf.Conf.NsqConf.NodeId) <= 0 { |
| | | return errors.New("no NodeId") |
| | | } |
| | | if !atomic.CompareAndSwapInt32(&c.initFlag, 0, 1) { |
| | | return nil |
| | | } |
| | | if err := initProducer(); err != nil { |
| | | return err |
| | | } |
| | | |
| | | var topics = []string{ |
| | | constvar.NsqTopicScheduleTask, |
| | | constvar.NsqTopicSendPlcAddress, |
| | | constvar.NsqTopicApsProcessParams, |
| | | constvar.NsqTopicDeviceUpdate, |
| | | constvar.NsqTopicPullDataResponse, |
| | |
| | | } |
| | | |
| | | func (c *consumerManager) stop() { |
| | | if !atomic.CompareAndSwapInt32(&c.initFlag, 1, 0) { |
| | | return |
| | | } |
| | | c.clients.Range(func(key, value any) bool { |
| | | if consumer, ok := value.(*nsqclient.NsqConsumer); ok { |
| | | nsqclient.DestroyNsqConsumer(consumer) |
| | | logx.Infof("try stop consumer, topic : %v", key) |
| | | consumer = nil |
| | | c.clients.Delete(key) |
| | | } |
| | | |
| | | return true |
| | | }) |
| | | } |
| | | |
| | | func (c *consumerManager) ping() bool { |
| | | connections, err := net.Connections("inet") |
| | | if err != nil { |
| | | fmt.Println("Error:", err) |
| | | return false |
| | | } |
| | | for _, conn := range connections { |
| | | ipPort := fmt.Sprintf("%s:%d", conn.Raddr.IP, conn.Raddr.Port) |
| | | fmt.Println("net.Connections ipPort", ipPort) |
| | | if ipPort == conf.Conf.NsqConf.NsqdAddr && conn.Status == "ESTABLISHED" { |
| | | return true |
| | | } |
| | | } |
| | | return false |
| | | } |
| | | |
| | | func Init() error { |
| | | return defaultConsumerManager.init() |
| | | } |
| | | |
| | | func Ping() bool { |
| | | return defaultConsumerManager.ping() |
| | | } |
| | | |
| | | func Stop() { |
| | | defaultConsumerManager.stop() |
| | | StopProducer() |
| | | } |