From 942f3416b333304bde50f0dca5581595f397eafa Mon Sep 17 00:00:00 2001
From: gigibox <gigibox@163.com>
Date: 星期一, 19 六月 2023 16:27:26 +0800
Subject: [PATCH] 完善功能,添加日志,添加nsq tcp上报

---
 nsqclient/client.go |   38 +++++++++++++++++++++++++++-----------
 1 files changed, 27 insertions(+), 11 deletions(-)

diff --git a/nsqclient/client.go b/nsqclient/client.go
index 1b27d18..46f5911 100644
--- a/nsqclient/client.go
+++ b/nsqclient/client.go
@@ -1,17 +1,18 @@
 package nsqclient
 
 import (
+	"context"
 	"fmt"
+
 	"kingdee-dbapi/config"
 )
 
-var nsqClient Producer
+var producerCli Producer
+var consumeCli NsqConsumer
 
-const plcTopic = "plcTopic"
-
-func InitNsqClient() error {
+func InitNsqProducer() error {
 	var err error
-	nsqClient, err = NewProducer(config.Options.NsqServer)
+	producerCli, err = NewProducer(config.Options.NsqServer)
 	if err != nil {
 		fmt.Println(err.Error())
 	}
@@ -19,17 +20,32 @@
 	return err
 }
 
-func Produce(msg []byte) (err error) {
-	if nsqClient == nil {
-		err = InitNsqClient()
+func Produce(topic string, msg []byte) bool {
+	if producerCli == nil {
+		err := InitNsqProducer()
 		if err != nil {
-			return err
+			fmt.Println("Init Nsq Client error:" + err.Error())
+			return false
 		}
 	}
 
-	if err = nsqClient.Publish(plcTopic, msg); err != nil {
+	err := producerCli.Publish(topic, msg)
+	if err != nil {
 		fmt.Println("Publish error:" + err.Error())
 	}
 
-	return
+	return err == nil
+}
+
+func InitNsqConsumer(topic, channel string, handle func(data []byte) error) {
+	if c, err := NewNsqConsumer(context.Background(), topic, channel); err != nil {
+		fmt.Println("NewNsqConsumer failed", err)
+		return
+	} else {
+		c.AddHandler(handle)
+		if err := c.Run(config.Options.NsqServer, 1); err != nil {
+			fmt.Println("run consumer failed", err)
+		}
+
+	}
 }

--
Gitblit v1.8.0