| | |
| | | package nsqclient |
| | | |
| | | import ( |
| | | "context" |
| | | "fmt" |
| | | |
| | | "kingdee-dbapi/config" |
| | | ) |
| | | |
| | | var nsqClient Producer |
| | | var producerCli Producer |
| | | var consumeCli NsqConsumer |
| | | |
| | | const plcTopic = "plcTopic" |
| | | |
| | | func InitNsqClient() error { |
| | | func InitNsqProducer() error { |
| | | var err error |
| | | nsqClient, err = NewProducer(config.Options.NsqServer) |
| | | producerCli, err = NewProducer(config.Options.NsqServer) |
| | | if err != nil { |
| | | fmt.Println(err.Error()) |
| | | } |
| | |
| | | return err |
| | | } |
| | | |
| | | func Produce(msg []byte) (err error) { |
| | | if nsqClient == nil { |
| | | err = InitNsqClient() |
| | | func Produce(topic string, msg []byte) bool { |
| | | if producerCli == nil { |
| | | err := InitNsqProducer() |
| | | if err != nil { |
| | | return err |
| | | fmt.Println("Init Nsq Client error:" + err.Error()) |
| | | return false |
| | | } |
| | | } |
| | | |
| | | if err = nsqClient.Publish(plcTopic, msg); err != nil { |
| | | err := producerCli.Publish(topic, msg) |
| | | if err != nil { |
| | | fmt.Println("Publish error:" + err.Error()) |
| | | } |
| | | |
| | | return |
| | | return err == nil |
| | | } |
| | | |
| | | func InitNsqConsumer(topic, channel string, handle func(data []byte) error) { |
| | | if c, err := NewNsqConsumer(context.Background(), topic, channel); err != nil { |
| | | fmt.Println("NewNsqConsumer failed", err) |
| | | return |
| | | } else { |
| | | c.AddHandler(handle) |
| | | if err := c.Run(config.Options.NsqServer, 1); err != nil { |
| | | fmt.Println("run consumer failed", err) |
| | | } |
| | | |
| | | } |
| | | } |