From 4244acad88afc44ae8a34a8fbd9e296780a0cc64 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 16 五月 2023 16:00:36 +0800
Subject: [PATCH] remove github.com nsq depend

---
 nsqclient/consumer.go |   17 +++++++++++++----
 TST/test/test.go      |   12 +++++-------
 main.go               |   16 +++++++---------
 TST/ctest/ctest.cpp   |    6 ++----
 4 files changed, 27 insertions(+), 24 deletions(-)

diff --git a/TST/ctest/ctest.cpp b/TST/ctest/ctest.cpp
index 69316ee..73be301 100644
--- a/TST/ctest/ctest.cpp
+++ b/TST/ctest/ctest.cpp
@@ -82,12 +82,10 @@
 int main(int argc, char const *argv[])
 {
     thread([]{
-        produce(false);
+        produce(true);
     }).detach();
 
-    // thread([]{
-    //     consume("test2", "sensor01");
-    // }).detach();
+    thread([]{ consume("test2", "sensor01"); }).detach();
 
     consume("test", "sensor01");
 
diff --git a/TST/test/test.go b/TST/test/test.go
index 62d1fae..81e9305 100644
--- a/TST/test/test.go
+++ b/TST/test/test.go
@@ -6,8 +6,6 @@
 	"log"
 	"nsqCli/nsqclient"
 	"time"
-
-	"github.com/nsqio/go-nsq"
 )
 
 func produce(two bool) {
@@ -49,14 +47,14 @@
 		ch := make(chan struct{})
 
 		count := 0
-		c.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
+		c.AddHandler(func(data []byte) error {
 			count++
-			fmt.Println("recv msg ", string(msg.Body), " size", count)
+			fmt.Println("recv msg ", string(data), " size", count)
 			if count > 999000 {
 				ch <- struct{}{}
 			}
 			return nil
-		}))
+		})
 		// go c.Run("192.168.20.108:4150", 2)
 		go c.RunLookupd("192.168.20.108:4161", 2)
 
@@ -69,8 +67,8 @@
 }
 
 func Test() {
-	go produce(false)
+	go produce(true)
 
-	// go consume("test2", "sensor01")
+	go consume("test2", "sensor01")
 	consume("test", "sensor01")
 }
diff --git a/main.go b/main.go
index ededf6a..24250f3 100644
--- a/main.go
+++ b/main.go
@@ -10,8 +10,6 @@
 	"sync"
 	"time"
 	"unsafe"
-
-	"github.com/nsqio/go-nsq"
 )
 
 //export createProducer
@@ -62,7 +60,7 @@
 type consumer struct {
 	nsqcon *nsqclient.NsqConsumer
 	lck    sync.Mutex
-	msgs   []*nsq.Message
+	msgs   [][]byte
 }
 
 //export createConsumer
@@ -89,12 +87,12 @@
 //export Run
 func Run(ch unsafe.Pointer, addr string) {
 	c := ccvt(ch)
-	c.nsqcon.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
+	c.nsqcon.AddHandler(func(msg []byte) error {
 		c.lck.Lock()
 		defer c.lck.Unlock()
 		c.msgs = append(c.msgs, msg)
 		return nil
-	}))
+	})
 
 	c.nsqcon.Run(addr, 1)
 }
@@ -102,12 +100,12 @@
 //export RunLookupd
 func RunLookupd(ch unsafe.Pointer, lookAddr string) {
 	c := ccvt(ch)
-	c.nsqcon.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
+	c.nsqcon.AddHandler(func(msg []byte) error {
 		c.lck.Lock()
 		defer c.lck.Unlock()
 		c.msgs = append(c.msgs, msg)
 		return nil
-	}))
+	})
 
 	c.nsqcon.RunLookupd(lookAddr, 1)
 }
@@ -124,9 +122,9 @@
 	msg := c.msgs[0]
 	c.msgs = c.msgs[1:]
 
-	*size = C.size_t(len(msg.Body))
+	*size = C.size_t(len(msg))
 	ptr := C.malloc(*size)
-	C.memcpy(ptr, unsafe.Pointer(&msg.Body[0]), *size)
+	C.memcpy(ptr, unsafe.Pointer(&msg[0]), *size)
 	*data = ptr
 
 	return true
diff --git a/nsqclient/consumer.go b/nsqclient/consumer.go
index 152989d..a0df0b0 100644
--- a/nsqclient/consumer.go
+++ b/nsqclient/consumer.go
@@ -9,8 +9,9 @@
 )
 
 type NsqConsumer struct {
-	consumer  *nsq.Consumer
-	handler   nsq.Handler
+	consumer *nsq.Consumer
+	// handler   nsq.Handler
+	handler   func([]byte) error
 	ctx       context.Context
 	ctxCancel context.CancelFunc
 	topic     string
@@ -46,7 +47,11 @@
 	}
 }
 
-func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
+// func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
+// 	n.handler = handler
+// }
+
+func (n *NsqConsumer) AddHandler(handler func([]byte) error) {
 	n.handler = handler
 }
 
@@ -60,7 +65,11 @@
 
 func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error {
 	n.consumer.ChangeMaxInFlight(concurrency)
-	n.consumer.AddConcurrentHandlers(n.handler, concurrency)
+	// n.consumer.AddConcurrentHandlers(n.handler, concurrency)
+	n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error {
+		return n.handler(msg.Body)
+		// return nil
+	}), concurrency)
 
 	var err error
 	if len(qAddr) > 0 {

--
Gitblit v1.8.0