fix
zhangqian
2023-12-01 8324f872ef3a4d0c978a9b1d062800c6a1701c12
nsq/nsq.go
@@ -9,12 +9,15 @@
   "context"
   "errors"
   "fmt"
   "github.com/shirou/gopsutil/net"
   "sync"
   "sync/atomic"
)
type consumerManager struct {
   ctx     context.Context
   clients sync.Map
   ctx      context.Context
   clients  sync.Map
   initFlag int32
}
var defaultConsumerManager *consumerManager
@@ -27,14 +30,14 @@
   if len(conf.Conf.NsqConf.NodeId) <= 0 {
      return errors.New("no NodeId")
   }
   if !atomic.CompareAndSwapInt32(&c.initFlag, 0, 1) {
      return nil
   }
   if err := initProducer(); err != nil {
      return err
   }
   var topics = []string{
      constvar.NsqTopicScheduleTask,
      constvar.NsqTopicSendPlcAddress,
      constvar.NsqTopicProcessParamsResponse,
      constvar.NsqTopicApsProcessParams,
      constvar.NsqTopicDeviceUpdate,
      constvar.NsqTopicPullDataResponse,
@@ -53,7 +56,7 @@
         logx.Errorf("start nsq consume err: %v", err)
      }
      c.clients.Store(topic, client)
      logx.Infof("add consumer success, topic:%v", topic)
      if len(conf.Conf.NsqConf.NsqlookupdAddr) > 0 {
         if err = client.RunLookupd(conf.Conf.NsqConf.NsqlookupdAddr, 1); err != nil {
            logx.Errorf("RunLookupd err:%v", err)
@@ -65,24 +68,49 @@
            return
         }
      }
      logx.Infof("add consumer success, topic:%v", topic)
   })
}
func (c *consumerManager) stop() {
   if !atomic.CompareAndSwapInt32(&c.initFlag, 1, 0) {
      return
   }
   c.clients.Range(func(key, value any) bool {
      if consumer, ok := value.(*nsqclient.NsqConsumer); ok {
         nsqclient.DestroyNsqConsumer(consumer)
         logx.Infof("try stop consumer, topic : %v", key)
         consumer = nil
         c.clients.Delete(key)
      }
      return true
   })
}
func (c *consumerManager) ping() bool {
   connections, err := net.Connections("inet")
   if err != nil {
      fmt.Println("Error:", err)
      return false
   }
   for _, conn := range connections {
      ipPort := fmt.Sprintf("%s:%d", conn.Raddr.IP, conn.Raddr.Port)
      fmt.Println("net.Connections ipPort", ipPort)
      if ipPort == conf.Conf.NsqConf.NsqdAddr && conn.Status == "ESTABLISHED" {
         return true
      }
   }
   return false
}
func Init() error {
   return defaultConsumerManager.init()
}
func Ping() bool {
   return defaultConsumerManager.ping()
}
func Stop() {
   defaultConsumerManager.stop()
   StopProducer()
}