zhangzengfei
2023-06-30 aed40b946f0e5de5aa82d4be47595905065770b0
nsqclient/client.go
@@ -1,35 +1,50 @@
package nsqclient
import (
   "fmt"
   "context"
   "kingdee-dbapi/config"
   "kingdee-dbapi/logger"
)
var nsqClient Producer
var producerCli Producer
const plcTopic = "plcTopic"
func InitNsqClient() error {
func InitNsqProducer() error {
   var err error
   nsqClient, err = NewProducer(config.Options.NsqServer)
   producerCli, err = NewProducer(config.Options.NsqServer)
   if err != nil {
      fmt.Println(err.Error())
      logger.Error("创建nsq生产客户端失败, %s", err.Error())
   }
   return err
}
func Produce(msg []byte) (err error) {
   if nsqClient == nil {
      err = InitNsqClient()
func Produce(topic string, msg []byte) bool {
   if producerCli == nil {
      err := InitNsqProducer()
      if err != nil {
         return err
         logger.Error("创建nsq生产客户端失败, %s", err.Error())
         return false
      }
   }
   if err = nsqClient.Publish(plcTopic, msg); err != nil {
      fmt.Println("Publish error:" + err.Error())
   err := producerCli.Publish(topic, msg)
   if err != nil {
      logger.Error("nsp发布消息失败,主题:%s, %s", topic, err.Error())
   }
   return
   return err == nil
}
func InitNsqConsumer(topic, channel string, handle func(data []byte) error) {
   if c, err := NewNsqConsumer(context.Background(), topic, channel); err != nil {
      logger.Error("创建nsq消费客户端失败, %s", err.Error())
      return
   } else {
      c.AddHandler(handle)
      if err := c.Run(config.Options.NsqServer, 1); err != nil {
         logger.Error("运行nsq消费客户端失败, %s", err.Error())
      }
   }
}