gigibox
2023-06-19 942f3416b333304bde50f0dca5581595f397eafa
nsqclient/client.go
@@ -1,17 +1,18 @@
package nsqclient
import (
   "context"
   "fmt"
   "kingdee-dbapi/config"
)
var nsqClient Producer
var producerCli Producer
var consumeCli NsqConsumer
const plcTopic = "plcTopic"
func InitNsqClient() error {
func InitNsqProducer() error {
   var err error
   nsqClient, err = NewProducer(config.Options.NsqServer)
   producerCli, err = NewProducer(config.Options.NsqServer)
   if err != nil {
      fmt.Println(err.Error())
   }
@@ -19,17 +20,32 @@
   return err
}
func Produce(msg []byte) (err error) {
   if nsqClient == nil {
      err = InitNsqClient()
func Produce(topic string, msg []byte) bool {
   if producerCli == nil {
      err := InitNsqProducer()
      if err != nil {
         return err
         fmt.Println("Init Nsq Client error:" + err.Error())
         return false
      }
   }
   if err = nsqClient.Publish(plcTopic, msg); err != nil {
   err := producerCli.Publish(topic, msg)
   if err != nil {
      fmt.Println("Publish error:" + err.Error())
   }
   return
   return err == nil
}
func InitNsqConsumer(topic, channel string, handle func(data []byte) error) {
   if c, err := NewNsqConsumer(context.Background(), topic, channel); err != nil {
      fmt.Println("NewNsqConsumer failed", err)
      return
   } else {
      c.AddHandler(handle)
      if err := c.Run(config.Options.NsqServer, 1); err != nil {
         fmt.Println("run consumer failed", err)
      }
   }
}