From 4244acad88afc44ae8a34a8fbd9e296780a0cc64 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 16 五月 2023 16:00:36 +0800
Subject: [PATCH] remove github.com nsq depend
---
nsqclient/consumer.go | 17 +++++++++++++----
TST/test/test.go | 12 +++++-------
main.go | 16 +++++++---------
TST/ctest/ctest.cpp | 6 ++----
4 files changed, 27 insertions(+), 24 deletions(-)
diff --git a/TST/ctest/ctest.cpp b/TST/ctest/ctest.cpp
index 69316ee..73be301 100644
--- a/TST/ctest/ctest.cpp
+++ b/TST/ctest/ctest.cpp
@@ -82,12 +82,10 @@
int main(int argc, char const *argv[])
{
thread([]{
- produce(false);
+ produce(true);
}).detach();
- // thread([]{
- // consume("test2", "sensor01");
- // }).detach();
+ thread([]{ consume("test2", "sensor01"); }).detach();
consume("test", "sensor01");
diff --git a/TST/test/test.go b/TST/test/test.go
index 62d1fae..81e9305 100644
--- a/TST/test/test.go
+++ b/TST/test/test.go
@@ -6,8 +6,6 @@
"log"
"nsqCli/nsqclient"
"time"
-
- "github.com/nsqio/go-nsq"
)
func produce(two bool) {
@@ -49,14 +47,14 @@
ch := make(chan struct{})
count := 0
- c.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
+ c.AddHandler(func(data []byte) error {
count++
- fmt.Println("recv msg ", string(msg.Body), " size", count)
+ fmt.Println("recv msg ", string(data), " size", count)
if count > 999000 {
ch <- struct{}{}
}
return nil
- }))
+ })
// go c.Run("192.168.20.108:4150", 2)
go c.RunLookupd("192.168.20.108:4161", 2)
@@ -69,8 +67,8 @@
}
func Test() {
- go produce(false)
+ go produce(true)
- // go consume("test2", "sensor01")
+ go consume("test2", "sensor01")
consume("test", "sensor01")
}
diff --git a/main.go b/main.go
index ededf6a..24250f3 100644
--- a/main.go
+++ b/main.go
@@ -10,8 +10,6 @@
"sync"
"time"
"unsafe"
-
- "github.com/nsqio/go-nsq"
)
//export createProducer
@@ -62,7 +60,7 @@
type consumer struct {
nsqcon *nsqclient.NsqConsumer
lck sync.Mutex
- msgs []*nsq.Message
+ msgs [][]byte
}
//export createConsumer
@@ -89,12 +87,12 @@
//export Run
func Run(ch unsafe.Pointer, addr string) {
c := ccvt(ch)
- c.nsqcon.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
+ c.nsqcon.AddHandler(func(msg []byte) error {
c.lck.Lock()
defer c.lck.Unlock()
c.msgs = append(c.msgs, msg)
return nil
- }))
+ })
c.nsqcon.Run(addr, 1)
}
@@ -102,12 +100,12 @@
//export RunLookupd
func RunLookupd(ch unsafe.Pointer, lookAddr string) {
c := ccvt(ch)
- c.nsqcon.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
+ c.nsqcon.AddHandler(func(msg []byte) error {
c.lck.Lock()
defer c.lck.Unlock()
c.msgs = append(c.msgs, msg)
return nil
- }))
+ })
c.nsqcon.RunLookupd(lookAddr, 1)
}
@@ -124,9 +122,9 @@
msg := c.msgs[0]
c.msgs = c.msgs[1:]
- *size = C.size_t(len(msg.Body))
+ *size = C.size_t(len(msg))
ptr := C.malloc(*size)
- C.memcpy(ptr, unsafe.Pointer(&msg.Body[0]), *size)
+ C.memcpy(ptr, unsafe.Pointer(&msg[0]), *size)
*data = ptr
return true
diff --git a/nsqclient/consumer.go b/nsqclient/consumer.go
index 152989d..a0df0b0 100644
--- a/nsqclient/consumer.go
+++ b/nsqclient/consumer.go
@@ -9,8 +9,9 @@
)
type NsqConsumer struct {
- consumer *nsq.Consumer
- handler nsq.Handler
+ consumer *nsq.Consumer
+ // handler nsq.Handler
+ handler func([]byte) error
ctx context.Context
ctxCancel context.CancelFunc
topic string
@@ -46,7 +47,11 @@
}
}
-func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
+// func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
+// n.handler = handler
+// }
+
+func (n *NsqConsumer) AddHandler(handler func([]byte) error) {
n.handler = handler
}
@@ -60,7 +65,11 @@
func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error {
n.consumer.ChangeMaxInFlight(concurrency)
- n.consumer.AddConcurrentHandlers(n.handler, concurrency)
+ // n.consumer.AddConcurrentHandlers(n.handler, concurrency)
+ n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error {
+ return n.handler(msg.Body)
+ // return nil
+ }), concurrency)
var err error
if len(qAddr) > 0 {
--
Gitblit v1.8.0