From 4244acad88afc44ae8a34a8fbd9e296780a0cc64 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 16 五月 2023 16:00:36 +0800 Subject: [PATCH] remove github.com nsq depend --- nsqclient/consumer.go | 17 +++++++++++++---- TST/test/test.go | 12 +++++------- main.go | 16 +++++++--------- TST/ctest/ctest.cpp | 6 ++---- 4 files changed, 27 insertions(+), 24 deletions(-) diff --git a/TST/ctest/ctest.cpp b/TST/ctest/ctest.cpp index 69316ee..73be301 100644 --- a/TST/ctest/ctest.cpp +++ b/TST/ctest/ctest.cpp @@ -82,12 +82,10 @@ int main(int argc, char const *argv[]) { thread([]{ - produce(false); + produce(true); }).detach(); - // thread([]{ - // consume("test2", "sensor01"); - // }).detach(); + thread([]{ consume("test2", "sensor01"); }).detach(); consume("test", "sensor01"); diff --git a/TST/test/test.go b/TST/test/test.go index 62d1fae..81e9305 100644 --- a/TST/test/test.go +++ b/TST/test/test.go @@ -6,8 +6,6 @@ "log" "nsqCli/nsqclient" "time" - - "github.com/nsqio/go-nsq" ) func produce(two bool) { @@ -49,14 +47,14 @@ ch := make(chan struct{}) count := 0 - c.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { + c.AddHandler(func(data []byte) error { count++ - fmt.Println("recv msg ", string(msg.Body), " size", count) + fmt.Println("recv msg ", string(data), " size", count) if count > 999000 { ch <- struct{}{} } return nil - })) + }) // go c.Run("192.168.20.108:4150", 2) go c.RunLookupd("192.168.20.108:4161", 2) @@ -69,8 +67,8 @@ } func Test() { - go produce(false) + go produce(true) - // go consume("test2", "sensor01") + go consume("test2", "sensor01") consume("test", "sensor01") } diff --git a/main.go b/main.go index ededf6a..24250f3 100644 --- a/main.go +++ b/main.go @@ -10,8 +10,6 @@ "sync" "time" "unsafe" - - "github.com/nsqio/go-nsq" ) //export createProducer @@ -62,7 +60,7 @@ type consumer struct { nsqcon *nsqclient.NsqConsumer lck sync.Mutex - msgs []*nsq.Message + msgs [][]byte } //export createConsumer @@ -89,12 +87,12 @@ //export Run func Run(ch unsafe.Pointer, addr string) { c := ccvt(ch) - c.nsqcon.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { + c.nsqcon.AddHandler(func(msg []byte) error { c.lck.Lock() defer c.lck.Unlock() c.msgs = append(c.msgs, msg) return nil - })) + }) c.nsqcon.Run(addr, 1) } @@ -102,12 +100,12 @@ //export RunLookupd func RunLookupd(ch unsafe.Pointer, lookAddr string) { c := ccvt(ch) - c.nsqcon.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { + c.nsqcon.AddHandler(func(msg []byte) error { c.lck.Lock() defer c.lck.Unlock() c.msgs = append(c.msgs, msg) return nil - })) + }) c.nsqcon.RunLookupd(lookAddr, 1) } @@ -124,9 +122,9 @@ msg := c.msgs[0] c.msgs = c.msgs[1:] - *size = C.size_t(len(msg.Body)) + *size = C.size_t(len(msg)) ptr := C.malloc(*size) - C.memcpy(ptr, unsafe.Pointer(&msg.Body[0]), *size) + C.memcpy(ptr, unsafe.Pointer(&msg[0]), *size) *data = ptr return true diff --git a/nsqclient/consumer.go b/nsqclient/consumer.go index 152989d..a0df0b0 100644 --- a/nsqclient/consumer.go +++ b/nsqclient/consumer.go @@ -9,8 +9,9 @@ ) type NsqConsumer struct { - consumer *nsq.Consumer - handler nsq.Handler + consumer *nsq.Consumer + // handler nsq.Handler + handler func([]byte) error ctx context.Context ctxCancel context.CancelFunc topic string @@ -46,7 +47,11 @@ } } -func (n *NsqConsumer) AddHandler(handler nsq.Handler) { +// func (n *NsqConsumer) AddHandler(handler nsq.Handler) { +// n.handler = handler +// } + +func (n *NsqConsumer) AddHandler(handler func([]byte) error) { n.handler = handler } @@ -60,7 +65,11 @@ func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error { n.consumer.ChangeMaxInFlight(concurrency) - n.consumer.AddConcurrentHandlers(n.handler, concurrency) + // n.consumer.AddConcurrentHandlers(n.handler, concurrency) + n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error { + return n.handler(msg.Body) + // return nil + }), concurrency) var err error if len(qAddr) > 0 { -- Gitblit v1.8.0