| | |
| | | |
| | | import ( |
| | | "context" |
| | | "fmt" |
| | | "kingdee-dbapi/logger" |
| | | |
| | | "kingdee-dbapi/config" |
| | | ) |
| | |
| | | var err error |
| | | producerCli, err = NewProducer(config.Options.NsqServer) |
| | | if err != nil { |
| | | fmt.Println(err.Error()) |
| | | logger.Error("创建nsq生产客户端失败, %s", err.Error()) |
| | | } |
| | | |
| | | return err |
| | |
| | | if producerCli == nil { |
| | | err := InitNsqProducer() |
| | | if err != nil { |
| | | fmt.Println("Init Nsq Client error:" + err.Error()) |
| | | logger.Error("创建nsq生产客户端失败, %s", err.Error()) |
| | | return false |
| | | } |
| | | } |
| | | |
| | | err := producerCli.Publish(topic, msg) |
| | | if err != nil { |
| | | fmt.Println("Publish error:" + err.Error()) |
| | | logger.Error("nsp发布消息失败,主题:%s, %s", topic, err.Error()) |
| | | } |
| | | |
| | | return err == nil |
| | |
| | | |
| | | func InitNsqConsumer(topic, channel string, handle func(data []byte) error) { |
| | | if c, err := NewNsqConsumer(context.Background(), topic, channel); err != nil { |
| | | fmt.Println("NewNsqConsumer failed", err) |
| | | logger.Error("创建nsq消费客户端失败, %s", err.Error()) |
| | | return |
| | | } else { |
| | | c.AddHandler(handle) |
| | | if err := c.Run(config.Options.NsqServer, 1); err != nil { |
| | | fmt.Println("run consumer failed", err) |
| | | logger.Error("运行nsq消费客户端失败, %s", err.Error()) |
| | | } |
| | | |
| | | } |