package nsqclient
|
|
import (
|
"context"
|
"fmt"
|
|
"kingdee-dbapi/config"
|
)
|
|
var producerCli Producer
|
var consumeCli NsqConsumer
|
|
func InitNsqProducer() error {
|
var err error
|
producerCli, err = NewProducer(config.Options.NsqServer)
|
if err != nil {
|
fmt.Println(err.Error())
|
}
|
|
return err
|
}
|
|
func Produce(topic string, msg []byte) bool {
|
if producerCli == nil {
|
err := InitNsqProducer()
|
if err != nil {
|
fmt.Println("Init Nsq Client error:" + err.Error())
|
return false
|
}
|
}
|
|
err := producerCli.Publish(topic, msg)
|
if err != nil {
|
fmt.Println("Publish error:" + err.Error())
|
}
|
|
return err == nil
|
}
|
|
func InitNsqConsumer(topic, channel string, handle func(data []byte) error) {
|
if c, err := NewNsqConsumer(context.Background(), topic, channel); err != nil {
|
fmt.Println("NewNsqConsumer failed", err)
|
return
|
} else {
|
c.AddHandler(handle)
|
if err := c.Run(config.Options.NsqServer, 1); err != nil {
|
fmt.Println("run consumer failed", err)
|
}
|
|
}
|
}
|