From 59e6096769bc175516c1bcbbc12e4711d1ff294c Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期二, 15 八月 2023 18:08:17 +0800
Subject: [PATCH] 调整代码结构
---
nsqclient/client.go | 43 +++++++++++++++++++++++++++++--------------
1 files changed, 29 insertions(+), 14 deletions(-)
diff --git a/nsqclient/client.go b/nsqclient/client.go
index 1b27d18..4bb7944 100644
--- a/nsqclient/client.go
+++ b/nsqclient/client.go
@@ -1,35 +1,50 @@
package nsqclient
import (
- "fmt"
+ "context"
+
"kingdee-dbapi/config"
+ "kingdee-dbapi/logger"
)
-var nsqClient Producer
+var producerCli Producer
-const plcTopic = "plcTopic"
-
-func InitNsqClient() error {
+func InitNsqProducer() error {
var err error
- nsqClient, err = NewProducer(config.Options.NsqServer)
+ producerCli, err = NewProducer(config.Options.NsqServer)
if err != nil {
- fmt.Println(err.Error())
+ logger.Error("鍒涘缓nsq鐢熶骇瀹㈡埛绔け璐�, %s", err.Error())
}
return err
}
-func Produce(msg []byte) (err error) {
- if nsqClient == nil {
- err = InitNsqClient()
+func Produce(topic string, msg []byte) bool {
+ if producerCli == nil {
+ err := InitNsqProducer()
if err != nil {
- return err
+ logger.Error("鍒涘缓nsq鐢熶骇瀹㈡埛绔け璐�, %s", err.Error())
+ return false
}
}
- if err = nsqClient.Publish(plcTopic, msg); err != nil {
- fmt.Println("Publish error:" + err.Error())
+ err := producerCli.Publish(topic, msg)
+ if err != nil {
+ logger.Error("nsp鍙戝竷娑堟伅澶辫触,涓婚:%s, %s", topic, err.Error())
}
- return
+ return err == nil
+}
+
+func InitNsqConsumer(topic, channel string, handle func(data []byte) error) {
+ if c, err := NewNsqConsumer(context.Background(), topic, channel); err != nil {
+ logger.Error("鍒涘缓nsq娑堣垂瀹㈡埛绔け璐�, %s", err.Error())
+ return
+ } else {
+ c.AddHandler(handle)
+ if err := c.Run(config.Options.NsqServer, 1); err != nil {
+ logger.Error("杩愯nsq娑堣垂瀹㈡埛绔け璐�, %s", err.Error())
+ }
+
+ }
}
--
Gitblit v1.8.0