From 942f3416b333304bde50f0dca5581595f397eafa Mon Sep 17 00:00:00 2001 From: gigibox <gigibox@163.com> Date: 星期一, 19 六月 2023 16:27:26 +0800 Subject: [PATCH] 完善功能,添加日志,添加nsq tcp上报 --- nsqclient/client.go | 38 +++++++++++++++++++++++++++----------- 1 files changed, 27 insertions(+), 11 deletions(-) diff --git a/nsqclient/client.go b/nsqclient/client.go index 1b27d18..46f5911 100644 --- a/nsqclient/client.go +++ b/nsqclient/client.go @@ -1,17 +1,18 @@ package nsqclient import ( + "context" "fmt" + "kingdee-dbapi/config" ) -var nsqClient Producer +var producerCli Producer +var consumeCli NsqConsumer -const plcTopic = "plcTopic" - -func InitNsqClient() error { +func InitNsqProducer() error { var err error - nsqClient, err = NewProducer(config.Options.NsqServer) + producerCli, err = NewProducer(config.Options.NsqServer) if err != nil { fmt.Println(err.Error()) } @@ -19,17 +20,32 @@ return err } -func Produce(msg []byte) (err error) { - if nsqClient == nil { - err = InitNsqClient() +func Produce(topic string, msg []byte) bool { + if producerCli == nil { + err := InitNsqProducer() if err != nil { - return err + fmt.Println("Init Nsq Client error:" + err.Error()) + return false } } - if err = nsqClient.Publish(plcTopic, msg); err != nil { + err := producerCli.Publish(topic, msg) + if err != nil { fmt.Println("Publish error:" + err.Error()) } - return + return err == nil +} + +func InitNsqConsumer(topic, channel string, handle func(data []byte) error) { + if c, err := NewNsqConsumer(context.Background(), topic, channel); err != nil { + fmt.Println("NewNsqConsumer failed", err) + return + } else { + c.AddHandler(handle) + if err := c.Run(config.Options.NsqServer, 1); err != nil { + fmt.Println("run consumer failed", err) + } + + } } -- Gitblit v1.8.0