package nsqclient import ( "context" "kingdee-dbapi/config" "kingdee-dbapi/logger" ) var producerCli Producer func InitNsqProducer() error { var err error producerCli, err = NewProducer(config.Options.NsqServer) if err != nil { logger.Error("创建nsq生产客户端失败, %s", err.Error()) } return err } func Produce(topic string, msg []byte) bool { if producerCli == nil { err := InitNsqProducer() if err != nil { logger.Error("创建nsq生产客户端失败, %s", err.Error()) return false } } err := producerCli.Publish(topic, msg) if err != nil { logger.Error("nsp发布消息失败,主题:%s, %s", topic, err.Error()) } return err == nil } func InitNsqConsumer(topic, channel string, handle func(data []byte) error) { if c, err := NewNsqConsumer(context.Background(), topic, channel); err != nil { logger.Error("创建nsq消费客户端失败, %s", err.Error()) return } else { c.AddHandler(handle) if err := c.Run(config.Options.NsqServer, 1); err != nil { logger.Error("运行nsq消费客户端失败, %s", err.Error()) } } }