From aed40b946f0e5de5aa82d4be47595905065770b0 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期五, 30 六月 2023 15:52:29 +0800
Subject: [PATCH] 调整代码

---
 nsqclient/client.go |   43 +++++++++++++++++++++++++++++--------------
 1 files changed, 29 insertions(+), 14 deletions(-)

diff --git a/nsqclient/client.go b/nsqclient/client.go
index 1b27d18..4bb7944 100644
--- a/nsqclient/client.go
+++ b/nsqclient/client.go
@@ -1,35 +1,50 @@
 package nsqclient
 
 import (
-	"fmt"
+	"context"
+
 	"kingdee-dbapi/config"
+	"kingdee-dbapi/logger"
 )
 
-var nsqClient Producer
+var producerCli Producer
 
-const plcTopic = "plcTopic"
-
-func InitNsqClient() error {
+func InitNsqProducer() error {
 	var err error
-	nsqClient, err = NewProducer(config.Options.NsqServer)
+	producerCli, err = NewProducer(config.Options.NsqServer)
 	if err != nil {
-		fmt.Println(err.Error())
+		logger.Error("鍒涘缓nsq鐢熶骇瀹㈡埛绔け璐�, %s", err.Error())
 	}
 
 	return err
 }
 
-func Produce(msg []byte) (err error) {
-	if nsqClient == nil {
-		err = InitNsqClient()
+func Produce(topic string, msg []byte) bool {
+	if producerCli == nil {
+		err := InitNsqProducer()
 		if err != nil {
-			return err
+			logger.Error("鍒涘缓nsq鐢熶骇瀹㈡埛绔け璐�, %s", err.Error())
+			return false
 		}
 	}
 
-	if err = nsqClient.Publish(plcTopic, msg); err != nil {
-		fmt.Println("Publish error:" + err.Error())
+	err := producerCli.Publish(topic, msg)
+	if err != nil {
+		logger.Error("nsp鍙戝竷娑堟伅澶辫触,涓婚:%s, %s", topic, err.Error())
 	}
 
-	return
+	return err == nil
+}
+
+func InitNsqConsumer(topic, channel string, handle func(data []byte) error) {
+	if c, err := NewNsqConsumer(context.Background(), topic, channel); err != nil {
+		logger.Error("鍒涘缓nsq娑堣垂瀹㈡埛绔け璐�, %s", err.Error())
+		return
+	} else {
+		c.AddHandler(handle)
+		if err := c.Run(config.Options.NsqServer, 1); err != nil {
+			logger.Error("杩愯nsq娑堣垂瀹㈡埛绔け璐�, %s", err.Error())
+		}
+
+	}
 }

--
Gitblit v1.8.0