From aed40b946f0e5de5aa82d4be47595905065770b0 Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期五, 30 六月 2023 15:52:29 +0800 Subject: [PATCH] 调整代码 --- nsqclient/client.go | 43 +++++++++++++++++++++++++++++-------------- 1 files changed, 29 insertions(+), 14 deletions(-) diff --git a/nsqclient/client.go b/nsqclient/client.go index 1b27d18..4bb7944 100644 --- a/nsqclient/client.go +++ b/nsqclient/client.go @@ -1,35 +1,50 @@ package nsqclient import ( - "fmt" + "context" + "kingdee-dbapi/config" + "kingdee-dbapi/logger" ) -var nsqClient Producer +var producerCli Producer -const plcTopic = "plcTopic" - -func InitNsqClient() error { +func InitNsqProducer() error { var err error - nsqClient, err = NewProducer(config.Options.NsqServer) + producerCli, err = NewProducer(config.Options.NsqServer) if err != nil { - fmt.Println(err.Error()) + logger.Error("鍒涘缓nsq鐢熶骇瀹㈡埛绔け璐�, %s", err.Error()) } return err } -func Produce(msg []byte) (err error) { - if nsqClient == nil { - err = InitNsqClient() +func Produce(topic string, msg []byte) bool { + if producerCli == nil { + err := InitNsqProducer() if err != nil { - return err + logger.Error("鍒涘缓nsq鐢熶骇瀹㈡埛绔け璐�, %s", err.Error()) + return false } } - if err = nsqClient.Publish(plcTopic, msg); err != nil { - fmt.Println("Publish error:" + err.Error()) + err := producerCli.Publish(topic, msg) + if err != nil { + logger.Error("nsp鍙戝竷娑堟伅澶辫触,涓婚:%s, %s", topic, err.Error()) } - return + return err == nil +} + +func InitNsqConsumer(topic, channel string, handle func(data []byte) error) { + if c, err := NewNsqConsumer(context.Background(), topic, channel); err != nil { + logger.Error("鍒涘缓nsq娑堣垂瀹㈡埛绔け璐�, %s", err.Error()) + return + } else { + c.AddHandler(handle) + if err := c.Run(config.Options.NsqServer, 1); err != nil { + logger.Error("杩愯nsq娑堣垂瀹㈡埛绔け璐�, %s", err.Error()) + } + + } } -- Gitblit v1.8.0