gigibox
2023-06-20 4983adc4d20798a980025be4cd45c77558c74265
nsqclient/client.go
@@ -2,7 +2,7 @@
import (
   "context"
   "fmt"
   "kingdee-dbapi/logger"
   "kingdee-dbapi/config"
)
@@ -14,7 +14,7 @@
   var err error
   producerCli, err = NewProducer(config.Options.NsqServer)
   if err != nil {
      fmt.Println(err.Error())
      logger.Error("创建nsq生产客户端失败, %s", err.Error())
   }
   return err
@@ -24,14 +24,14 @@
   if producerCli == nil {
      err := InitNsqProducer()
      if err != nil {
         fmt.Println("Init Nsq Client error:" + err.Error())
         logger.Error("创建nsq生产客户端失败, %s", err.Error())
         return false
      }
   }
   err := producerCli.Publish(topic, msg)
   if err != nil {
      fmt.Println("Publish error:" + err.Error())
      logger.Error("nsp发布消息失败,主题:%s, %s", topic, err.Error())
   }
   return err == nil
@@ -39,12 +39,12 @@
func InitNsqConsumer(topic, channel string, handle func(data []byte) error) {
   if c, err := NewNsqConsumer(context.Background(), topic, channel); err != nil {
      fmt.Println("NewNsqConsumer failed", err)
      logger.Error("创建nsq消费客户端失败, %s", err.Error())
      return
   } else {
      c.AddHandler(handle)
      if err := c.Run(config.Options.NsqServer, 1); err != nil {
         fmt.Println("run consumer failed", err)
         logger.Error("运行nsq消费客户端失败, %s", err.Error())
      }
   }