From ecb63331999a88c4980997aa8473ffed0dd31a3c Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 16 五月 2023 15:42:56 +0800 Subject: [PATCH] nsqclient producer consumer --- nsqclient/channel_test.go | 218 ++++++++++ .gitignore | 3 go.mod | 5 make.sh | 10 nsqclient/LICENSE | 20 nsqclient/conn.go | 45 ++ TST/ctest/ctest.cpp | 95 ++++ nsqclient/channel.go | 134 ++++++ nsqclient/consumer.go | 90 ++++ go.sum | 4 nsqclient/pointer.go | 57 ++ nsqclient/producer.go | 139 ++++++ nsqclient/README.md | 54 ++ nsqclient/pool.go | 25 + TST/test/test.go | 76 +++ main.go | 144 ++++++ clib/libnsqclient.h | 101 ++++ 17 files changed, 1,220 insertions(+), 0 deletions(-) diff --git a/.gitignore b/.gitignore index 8365624..c41f8dc 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,6 @@ *.exe *.test + +nsqCli +cnsqcli diff --git a/TST/ctest/ctest.cpp b/TST/ctest/ctest.cpp new file mode 100644 index 0000000..69316ee --- /dev/null +++ b/TST/ctest/ctest.cpp @@ -0,0 +1,95 @@ +#include <stdio.h> +#include <string.h> +#include "clib/libnsqclient.h" + +#include <string> +#include <thread> +#include <mutex> + +using namespace std; + +static void produce(int two){ + char ip[] = "192.168.20.108:4150"; + GoString addr = {ip, (ptrdiff_t)strlen(ip)}; + void* p = createProducer(addr); + + string msg("cnsqclient dynamic library"); + while(msg.size() < 32){ + msg += msg; + } + // printf("msg %s\n", msg.c_str()); + + for(int i = 0; i < 1000000; i++){ + GoString topic = {"test", 4}; + string amsg = msg + "-x"; + GoSlice data{(void*)amsg.data(), (GoInt)amsg.size(), (GoInt)amsg.size()}; + if (!publish(p, topic, data)){ + printf("publish msg failed topic %s\n", topic.p); + exit(0); + } + + if (two){ + topic.p = "test2"; + topic.n = 5; + amsg = msg + "-y"; + data.data = (void*)amsg.data(); + if (!publish(p, topic, data)){ + printf("publish msg failed topic %s\n", topic.p); + exit(0); + } + } + + } + destroyProducer(p); +} + + +static void consume(const char* topic, const char* channel){ + GoString t = {topic, (ptrdiff_t)strlen(topic)}; + GoString c = {channel, (ptrdiff_t)strlen(channel)}; + + void* con = createConsumer(t, c); + + char ip[] = "192.168.20.108:4150"; + GoString addr = {ip, (ptrdiff_t)strlen(ip)}; + + // thread + thread([&con,&addr]{ + Run(con, addr); + }).detach(); + + auto start = chrono::steady_clock::now(); + int count = 0; + while (true) { + void* msg = NULL; + size_t size = 0; + GoUint8 ok = getMessage(con, &msg, &size); + if (!ok){ + this_thread::sleep_for(chrono::milliseconds(100)); + continue; + } + count++; + printf("======>> recv msg %s size %d\n", (char*)msg, count); + relMessage(msg); + if (count > 999000){ + printf("======>> use time %d\n", + chrono::duration_cast<chrono::seconds>(chrono::steady_clock::now()-start).count()); + } + } + printf("======>> recv all msg size %d\n", count); +} + +int main(int argc, char const *argv[]) +{ + thread([]{ + produce(false); + }).detach(); + + // thread([]{ + // consume("test2", "sensor01"); + // }).detach(); + + consume("test", "sensor01"); + + return 0; +} \ No newline at end of file diff --git a/TST/test/test.go b/TST/test/test.go new file mode 100644 index 0000000..62d1fae --- /dev/null +++ b/TST/test/test.go @@ -0,0 +1,76 @@ +package test + +import ( + "context" + "fmt" + "log" + "nsqCli/nsqclient" + "time" + + "github.com/nsqio/go-nsq" +) + +func produce(two bool) { + p, _ := nsqclient.NewProducer("192.168.20.108:4150") + + var str string + for len(str) < 32 { + str += "cnsqclient dynamic library" + } + msgx := []byte(str + "--x") + msgy := []byte(str + "--y") + + // count := 0 + for i := 0; i < 1000000; i++ { + // if e := p.Publish("test", []byte("x")); e != nil { + if e := p.Publish("test", msgx); e != nil { + log.Fatal("Publish error:" + e.Error()) + } + + if two { + // if e := p.Publish("test", []byte("y")); e != nil { + if e := p.Publish("test2", msgy); e != nil { + log.Fatal("Publish error:" + e.Error()) + } + } + + // log.Println("send time ", count) + // count++ + } +} + +func consume(topic, channel string) { + ctx, cancel := context.WithCancel(context.Background()) + + if c, e := nsqclient.NewNsqConsumer(ctx, topic, channel); e != nil { + fmt.Println("NewNsqConsumer failed", e) + return + } else { + ch := make(chan struct{}) + + count := 0 + c.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { + count++ + fmt.Println("recv msg ", string(msg.Body), " size", count) + if count > 999000 { + ch <- struct{}{} + } + return nil + })) + // go c.Run("192.168.20.108:4150", 2) + go c.RunLookupd("192.168.20.108:4161", 2) + + t := time.Now() + <-ch + // fmt.Println("======>> use time ", time.Since(t)) + fmt.Println("======>> use time ", time.Now().Unix()-t.Unix()) + cancel() + } +} + +func Test() { + go produce(false) + + // go consume("test2", "sensor01") + consume("test", "sensor01") +} diff --git a/clib/libnsqclient.h b/clib/libnsqclient.h new file mode 100644 index 0000000..fab0277 --- /dev/null +++ b/clib/libnsqclient.h @@ -0,0 +1,101 @@ +/* Code generated by cmd/cgo; DO NOT EDIT. */ + +/* package nsqCli */ + + +#line 1 "cgo-builtin-export-prolog" + +#include <stddef.h> /* for ptrdiff_t below */ + +#ifndef GO_CGO_EXPORT_PROLOGUE_H +#define GO_CGO_EXPORT_PROLOGUE_H + +#ifndef GO_CGO_GOSTRING_TYPEDEF +typedef struct { const char *p; ptrdiff_t n; } _GoString_; +#endif + +#endif + +/* Start of preamble from import "C" comments. */ + + +#line 3 "main.go" + #include <stdlib.h> + #include <string.h> + +#line 1 "cgo-generated-wrapper" + + +/* End of preamble from import "C" comments. */ + + +/* Start of boilerplate cgo prologue. */ +#line 1 "cgo-gcc-export-header-prolog" + +#ifndef GO_CGO_PROLOGUE_H +#define GO_CGO_PROLOGUE_H + +typedef signed char GoInt8; +typedef unsigned char GoUint8; +typedef short GoInt16; +typedef unsigned short GoUint16; +typedef int GoInt32; +typedef unsigned int GoUint32; +typedef long long GoInt64; +typedef unsigned long long GoUint64; +typedef GoInt64 GoInt; +typedef GoUint64 GoUint; +typedef __SIZE_TYPE__ GoUintptr; +typedef float GoFloat32; +typedef double GoFloat64; +typedef float _Complex GoComplex64; +typedef double _Complex GoComplex128; + +/* + static assertion to make sure the file is being used on architecture + at least with matching size of GoInt. +*/ +typedef char _check_for_64_bit_pointer_matching_GoInt[sizeof(void*)==64/8 ? 1:-1]; + +#ifndef GO_CGO_GOSTRING_TYPEDEF +typedef _GoString_ GoString; +#endif +typedef void *GoMap; +typedef void *GoChan; +typedef struct { void *t; void *v; } GoInterface; +typedef struct { void *data; GoInt len; GoInt cap; } GoSlice; + +#endif + +/* End of boilerplate cgo prologue. */ + +#ifdef __cplusplus +extern "C" { +#endif + + +extern void* createProducer(GoString p0); + +extern void destroyProducer(void* p0); + +extern GoUint8 publish(void* p0, GoString p1, GoSlice p2); + +extern GoUint8 multiPublish(void* p0, GoString p1, GoSlice p2); + +extern GoUint8 deferredPublish(void* p0, GoString p1, GoInt p2, GoSlice p3); + +extern void* createConsumer(GoString p0, GoString p1); + +extern void destroyConsumer(void* p0); + +extern void Run(void* p0, GoString p1); + +extern void RunLookupd(void* p0, GoString p1); + +extern GoUint8 getMessage(void* p0, void** p1, size_t* p2); + +extern void relMessage(void* p0); + +#ifdef __cplusplus +} +#endif diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..5a3990a --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module nsqCli + +go 1.14 + +require github.com/nsqio/go-nsq v1.1.0 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..457d1d9 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= +github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= diff --git a/main.go b/main.go new file mode 100644 index 0000000..ededf6a --- /dev/null +++ b/main.go @@ -0,0 +1,144 @@ +package main + +// #include <stdlib.h> +// #include <string.h> +import "C" + +import ( + "nsqCli/TST/test" + "nsqCli/nsqclient" + "sync" + "time" + "unsafe" + + "github.com/nsqio/go-nsq" +) + +//export createProducer +func createProducer(addr string) unsafe.Pointer { + n, _ := nsqclient.NewProducer(addr) + return nsqclient.Save(n) +} + +//export destroyProducer +func destroyProducer(ph unsafe.Pointer) { + nsqclient.Unref(ph) + nsqclient.DestroyProducerPool() +} + +func pcvt(ph unsafe.Pointer) nsqclient.Producer { + return nsqclient.Restore(ph).(nsqclient.Producer) +} + +//export publish +func publish(ph unsafe.Pointer, topic string, msg []byte) bool { + p := pcvt(ph) + if err := p.Publish(topic, msg); err != nil { + return false + } + return true +} + +//export multiPublish +func multiPublish(ph unsafe.Pointer, topic string, msg [][]byte) bool { + p := pcvt(ph) + if err := p.MultiPublish(topic, msg); err != nil { + return false + } + return true +} + +//export deferredPublish +func deferredPublish(ph unsafe.Pointer, topic string, ms int, msg []byte) bool { + p := pcvt(ph) + if err := p.DeferredPublish(topic, time.Duration(ms)*time.Millisecond, msg); err != nil { + return false + } + return true +} + +///////////////////////////////////////////////////////////// + +type consumer struct { + nsqcon *nsqclient.NsqConsumer + lck sync.Mutex + msgs []*nsq.Message +} + +//export createConsumer +func createConsumer(topic, channel string) unsafe.Pointer { + if c, err := nsqclient.NewNsqConsumer(nil, topic, channel); err == nil { + con := &consumer{ + nsqcon: c, + } + return nsqclient.Save(con) + } + return nil +} + +func ccvt(ch unsafe.Pointer) *consumer { + return nsqclient.Restore(ch).(*consumer) +} + +//export destroyConsumer +func destroyConsumer(ch unsafe.Pointer) { + nsqclient.DestroyNsqConsumer(ccvt(ch).nsqcon) + nsqclient.Unref(ch) +} + +//export Run +func Run(ch unsafe.Pointer, addr string) { + c := ccvt(ch) + c.nsqcon.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { + c.lck.Lock() + defer c.lck.Unlock() + c.msgs = append(c.msgs, msg) + return nil + })) + + c.nsqcon.Run(addr, 1) +} + +//export RunLookupd +func RunLookupd(ch unsafe.Pointer, lookAddr string) { + c := ccvt(ch) + c.nsqcon.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { + c.lck.Lock() + defer c.lck.Unlock() + c.msgs = append(c.msgs, msg) + return nil + })) + + c.nsqcon.RunLookupd(lookAddr, 1) +} + +//export getMessage +func getMessage(ch unsafe.Pointer, data *unsafe.Pointer, size *C.size_t) bool { + c := ccvt(ch) + c.lck.Lock() + defer c.lck.Unlock() + if len(c.msgs) == 0 { + return false + } + + msg := c.msgs[0] + c.msgs = c.msgs[1:] + + *size = C.size_t(len(msg.Body)) + ptr := C.malloc(*size) + C.memcpy(ptr, unsafe.Pointer(&msg.Body[0]), *size) + *data = ptr + + return true +} + +//export relMessage +func relMessage(msg unsafe.Pointer) { + if msg != nil { + C.free(msg) + } +} + +func main() { + test.Test() +} diff --git a/make.sh b/make.sh new file mode 100755 index 0000000..51d276d --- /dev/null +++ b/make.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +if [ ! -d "clib" ]; then + mkdir clib +fi +go tool cgo -exportheader clib/libnsqclient.h main.go && +go build -buildmode=c-shared -o ./clib/libnsqclient.so && +rm -fr _obj && +go build && +g++ -std=c++11 -g -O0 -o cnsqcli TST/ctest/ctest.cpp -I. -Lclib/ -lnsqclient -ldl -pthread diff --git a/nsqclient/LICENSE b/nsqclient/LICENSE new file mode 100644 index 0000000..25fdaf6 --- /dev/null +++ b/nsqclient/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2013 Fatih Arslan + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/nsqclient/README.md b/nsqclient/README.md new file mode 100644 index 0000000..432d86d --- /dev/null +++ b/nsqclient/README.md @@ -0,0 +1,54 @@ +# NSQPool + +NSQPool is a thread safe connection pool for nsq producer. It can be used to +manage and reuse nsq producer connection. + + +## Install and Usage + +Install the package with: + +```bash +github.com/qgymje/nsqpool +``` + +Import it with: + +```go +import ( + "github.com/qgymje/nsqpool" + nsq "github.com/nsqio/go-nsq" +) +``` + +and use `pool` as the package name inside the code. + +## Example + +```go +// create a factory() to be used with channel based pool +factory := func() (*nsq.Producer, error) { + config := nsq.NewConfig() + return nsq.NewProducer(":4150", config) +} + +nsqPool, err := pool.NewChannelPool(5, 30, factory) + +producer, err := nsqPool.Get() + +producer.Publish("topic", "some data") +// do something with producer and put it back to the pool by closing the connection +// (this doesn't close the underlying connection instead it's putting it back +// to the pool). +producer.Close() + +// close pool any time you want, this closes all the connections inside a pool +nsqPool.Close() + +// currently available connections in the pool +current := nsqPool.Len() +``` + +## License + +The MIT License (MIT) - see LICENSE for more details diff --git a/nsqclient/channel.go b/nsqclient/channel.go new file mode 100644 index 0000000..1594dcc --- /dev/null +++ b/nsqclient/channel.go @@ -0,0 +1,134 @@ +package nsqclient + +import ( + "errors" + "fmt" + "sync" + + nsq "github.com/nsqio/go-nsq" +) + +// channelPool implements the Pool interface based on buffered channels. +type channelPool struct { + // storage for our net.Conn connections + mu sync.Mutex + conns chan *nsq.Producer + + // net.Conn generator + factory Factory +} + +// Factory is a function to create new connections. +type Factory func() (*nsq.Producer, error) + +// NewChannelPool returns a new pool based on buffered channels with an initial +// capacity and maximum capacity. Factory is used when initial capacity is +// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool +// until a new Get() is called. During a Get(), If there is no new connection +// available in the pool, a new connection will be created via the Factory() +// method. +func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) { + if initialCap < 0 || maxCap <= 0 || initialCap > maxCap { + return nil, errors.New("invalid capacity settings") + } + + c := &channelPool{ + conns: make(chan *nsq.Producer, maxCap), + factory: factory, + } + + // create initial connections, if something goes wrong, + // just close the pool error out. + for i := 0; i < initialCap; i++ { + conn, err := factory() + if err != nil { + c.Close() + return nil, fmt.Errorf("factory is not able to fill the pool: %s", err) + } + c.conns <- conn + } + + return c, nil +} + +func (c *channelPool) getConns() chan *nsq.Producer { + c.mu.Lock() + conns := c.conns + c.mu.Unlock() + return conns +} + +// Get implements the Pool interfaces Get() method. If there is no new +// connection available in the pool, a new connection will be created via the +// Factory() method. +func (c *channelPool) Get() (*PoolConn, error) { + conns := c.getConns() + if conns == nil { + return nil, ErrClosed + } + + // wrap our connections with out custom net.Conn implementation (wrapConn + // method) that puts the connection back to the pool if it's closed. + select { + case conn := <-conns: + if conn == nil { + return nil, ErrClosed + } + + return c.wrapConn(conn), nil + default: + conn, err := c.factory() + if err != nil { + return nil, err + } + + return c.wrapConn(conn), nil + } +} + +// put puts the connection back to the pool. If the pool is full or closed, +// conn is simply closed. A nil conn will be rejected. +func (c *channelPool) put(conn *nsq.Producer) error { + if conn == nil { + return errors.New("connection is nil. rejecting") + } + + c.mu.Lock() + defer c.mu.Unlock() + + if c.conns == nil { + // pool is closed, close passed connection + conn.Stop() + return nil + } + + // put the resource back into the pool. If the pool is full, this will + // block and the default case will be executed. + select { + case c.conns <- conn: + return nil + default: + // pool is full, close passed connection + conn.Stop() + return nil + } +} + +func (c *channelPool) Close() { + c.mu.Lock() + conns := c.conns + c.conns = nil + c.factory = nil + c.mu.Unlock() + + if conns == nil { + return + } + + close(conns) + for conn := range conns { + conn.Stop() + } +} + +func (c *channelPool) Len() int { return len(c.getConns()) } diff --git a/nsqclient/channel_test.go b/nsqclient/channel_test.go new file mode 100644 index 0000000..725db9b --- /dev/null +++ b/nsqclient/channel_test.go @@ -0,0 +1,218 @@ +package nsqclient + +import ( + "math/rand" + "sync" + "testing" + "time" + + "github.com/nsqio/go-nsq" +) + +var ( + InitialCap = 2 + MaximumCap = 10 + factory = func() (*nsq.Producer, error) { + config := nsq.NewConfig() + return nsq.NewProducer(":4160", config) + } +) + +func newChannelPool() (Pool, error) { + return NewChannelPool(InitialCap, MaximumCap, factory) +} + +func TestNew(t *testing.T) { + _, err := newChannelPool() + if err != nil { + t.Errorf("New error: %s", err) + } +} + +func TestPool_Get(t *testing.T) { + p, _ := newChannelPool() + defer p.Close() + + _, err := p.Get() + if err != nil { + t.Errorf("Get error: %s", err) + } + + // after one get, current capacity should be lowered by one. + if p.Len() != (InitialCap - 1) { + t.Errorf("Get error. Expecting %d, got %d", + (InitialCap - 1), p.Len()) + } + + // get them all + var wg sync.WaitGroup + for i := 0; i < (InitialCap - 1); i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, err := p.Get() + if err != nil { + t.Errorf("Get error: %s", err) + } + }() + } + wg.Wait() + + if p.Len() != 0 { + t.Errorf("Get error. Expecting %d, got %d", + (InitialCap - 1), p.Len()) + } + + _, err = p.Get() + if err != nil { + t.Errorf("Get error: %s", err) + } +} + +func TestPool_Put(t *testing.T) { + p, err := NewChannelPool(0, 30, factory) + if err != nil { + t.Fatal(err) + } + defer p.Close() + + // get/create from the pool + conns := make([]*PoolConn, MaximumCap) + for i := 0; i < MaximumCap; i++ { + conn, _ := p.Get() + conns[i] = conn + } + + // now put them all back + for _, conn := range conns { + conn.Close() + } + + if p.Len() != MaximumCap { + t.Errorf("Put error len. Expecting %d, got %d", + 1, p.Len()) + } + + conn, _ := p.Get() + p.Close() // close pool + + conn.Close() // try to put into a full pool + if p.Len() != 0 { + t.Errorf("Put error. Closed pool shouldn't allow to put connections.") + } +} + +func TestPool_PutUnusableConn(t *testing.T) { + p, _ := newChannelPool() + defer p.Close() + + // ensure pool is not empty + conn, _ := p.Get() + conn.Close() + + poolSize := p.Len() + conn, _ = p.Get() + conn.Close() + if p.Len() != poolSize { + t.Errorf("Pool size is expected to be equal to initial size") + } + + conn, _ = p.Get() + conn.MarkUnusable() + + conn.Close() + if p.Len() != poolSize-1 { + t.Errorf("Pool size is expected to be initial_size - 1 [%d:%d]", p.Len(), poolSize-1) + } +} + +func TestPool_UsedCapacity(t *testing.T) { + p, _ := newChannelPool() + defer p.Close() + + if p.Len() != InitialCap { + t.Errorf("InitialCap error. Expecting %d, got %d", + InitialCap, p.Len()) + } +} + +func TestPool_Close(t *testing.T) { + p, _ := newChannelPool() + + // now close it and test all cases we are expecting. + p.Close() + + c := p.(*channelPool) + + if c.conns != nil { + t.Errorf("Close error, conns channel should be nil") + } + + if c.factory != nil { + t.Errorf("Close error, factory should be nil") + } + + _, err := p.Get() + if err == nil { + t.Errorf("Close error, get conn should return an error") + } + + if p.Len() != 0 { + t.Errorf("Close error used capacity. Expecting 0, got %d", p.Len()) + } +} + +func TestPoolConcurrent(t *testing.T) { + p, _ := newChannelPool() + pipe := make(chan *PoolConn, 0) + + go func() { + p.Close() + }() + + for i := 0; i < MaximumCap; i++ { + go func() { + conn, _ := p.Get() + + pipe <- conn + }() + + go func() { + conn := <-pipe + if conn == nil { + return + } + conn.Close() + }() + } +} + +func TestPoolConcurrent2(t *testing.T) { + p, _ := NewChannelPool(0, 30, factory) + + var wg sync.WaitGroup + + go func() { + for i := 0; i < 10; i++ { + wg.Add(1) + go func(i int) { + conn, _ := p.Get() + time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) + conn.Close() + wg.Done() + }(i) + } + }() + + for i := 0; i < 10; i++ { + wg.Add(1) + go func(i int) { + conn, _ := p.Get() + time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) + conn.Close() + wg.Done() + }(i) + } + + wg.Wait() +} diff --git a/nsqclient/conn.go b/nsqclient/conn.go new file mode 100644 index 0000000..5c680b5 --- /dev/null +++ b/nsqclient/conn.go @@ -0,0 +1,45 @@ +package nsqclient + +import ( + "sync" + + nsq "github.com/nsqio/go-nsq" +) + +// PoolConn is a wrapper around net.Conn to modify the the behavior of +// net.Conn's Close() method. +type PoolConn struct { + *nsq.Producer + mu sync.RWMutex + c *channelPool + unusable bool +} + +// Close puts the given connects back to the pool instead of closing it. +func (p *PoolConn) Close() error { + p.mu.RLock() + defer p.mu.RUnlock() + + if p.unusable { + if p.Producer != nil { + p.Producer.Stop() + return nil + } + return nil + } + return p.c.put(p.Producer) +} + +// MarkUnusable marks the connection not usable any more, to let the pool close it instead of returning it to pool. +func (p *PoolConn) MarkUnusable() { + p.mu.Lock() + p.unusable = true + p.mu.Unlock() +} + +// newConn wraps a standard net.Conn to a poolConn net.Conn. +func (c *channelPool) wrapConn(conn *nsq.Producer) *PoolConn { + p := &PoolConn{c: c} + p.Producer = conn + return p +} diff --git a/nsqclient/consumer.go b/nsqclient/consumer.go new file mode 100644 index 0000000..152989d --- /dev/null +++ b/nsqclient/consumer.go @@ -0,0 +1,90 @@ +package nsqclient + +import ( + "context" + "fmt" + "time" + + nsq "github.com/nsqio/go-nsq" +) + +type NsqConsumer struct { + consumer *nsq.Consumer + handler nsq.Handler + ctx context.Context + ctxCancel context.CancelFunc + topic string + channel string +} + +func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) { + conf := nsq.NewConfig() + conf.MaxAttempts = 0 + conf.MsgTimeout = 10 * time.Minute // 榛樿涓�涓秷鎭渶澶氳兘澶勭悊鍗佸垎閽燂紝鍚﹀垯灏变細閲嶆柊涓㈠叆闃熷垪 + conf.LookupdPollInterval = 3 * time.Second // 璋冩暣consumer鐨勯噸杩為棿闅旀椂闂翠负3绉� + for _, option := range options { + option(conf) + } + + consumer, err := nsq.NewConsumer(topic, channel, conf) + if err != nil { + return nil, err + } + return &NsqConsumer{ + consumer: consumer, + ctx: ctx, + topic: topic, + channel: channel, + }, nil +} + +func DestroyNsqConsumer(c *NsqConsumer) { + if c != nil { + if c.ctxCancel != nil { + c.ctxCancel() + } + } +} + +func (n *NsqConsumer) AddHandler(handler nsq.Handler) { + n.handler = handler +} + +func (n *NsqConsumer) Run(qaddr string, concurrency int) error { + return n.RunDistributed([]string{qaddr}, nil, concurrency) +} + +func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error { + return n.RunDistributed(nil, []string{lookupAddr}, concurrency) +} + +func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error { + n.consumer.ChangeMaxInFlight(concurrency) + n.consumer.AddConcurrentHandlers(n.handler, concurrency) + + var err error + if len(qAddr) > 0 { + err = n.consumer.ConnectToNSQDs(qAddr) + } else if len(lAddr) > 0 { + err = n.consumer.ConnectToNSQLookupds(lAddr) + } else { + err = fmt.Errorf("Addr Must NOT Empty") + } + if err != nil { + return err + } + + if n.ctx == nil { + n.ctx, n.ctxCancel = context.WithCancel(context.Background()) + } + + for { + select { + case <-n.ctx.Done(): + fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel) + n.consumer.Stop() + fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel) + return nil + } + } +} diff --git a/nsqclient/pointer.go b/nsqclient/pointer.go new file mode 100644 index 0000000..1cba795 --- /dev/null +++ b/nsqclient/pointer.go @@ -0,0 +1,57 @@ +package nsqclient + +// #include <stdlib.h> +import "C" +import ( + "sync" + "unsafe" +) + +var ( + mutex sync.RWMutex + store = map[unsafe.Pointer]interface{}{} +) + +func Save(v interface{}) unsafe.Pointer { + if v == nil { + return nil + } + + // Generate real fake C pointer. + // This pointer will not store any data, but will bi used for indexing purposes. + // Since Go doest allow to cast dangling pointer to unsafe.Pointer, we do rally allocate one byte. + // Why we need indexing, because Go doest allow C code to store pointers to Go data. + var ptr unsafe.Pointer = C.malloc(C.size_t(1)) + if ptr == nil { + panic("can't allocate 'cgo-pointer hack index pointer': ptr == nil") + } + + mutex.Lock() + store[ptr] = v + mutex.Unlock() + + return ptr +} + +func Restore(ptr unsafe.Pointer) (v interface{}) { + if ptr == nil { + return nil + } + + mutex.RLock() + v = store[ptr] + mutex.RUnlock() + return +} + +func Unref(ptr unsafe.Pointer) { + if ptr == nil { + return + } + + mutex.Lock() + delete(store, ptr) + mutex.Unlock() + + C.free(ptr) +} diff --git a/nsqclient/pool.go b/nsqclient/pool.go new file mode 100644 index 0000000..b773490 --- /dev/null +++ b/nsqclient/pool.go @@ -0,0 +1,25 @@ +// Package pool implements a pool of net.Conn interfaces to manage and reuse them. +package nsqclient + +import "errors" + +var ( + // ErrClosed is the error resulting if the pool is closed via pool.Close(). + ErrClosed = errors.New("pool is closed") +) + +// Pool interface describes a pool implementation. A pool should have maximum +// capacity. An ideal pool is threadsafe and easy to use. +type Pool interface { + // Get returns a new connection from the pool. Closing the connections puts + // it back to the Pool. Closing it when the pool is destroyed or full will + // be counted as an error. + Get() (*PoolConn, error) + + // Close closes the pool and all its connections. After Close() the pool is + // no longer usable. + Close() + + // Len returns the current number of connections of the pool. + Len() int +} diff --git a/nsqclient/producer.go b/nsqclient/producer.go new file mode 100644 index 0000000..717c7a1 --- /dev/null +++ b/nsqclient/producer.go @@ -0,0 +1,139 @@ +package nsqclient + +import ( + "fmt" + "time" + + nsq "github.com/nsqio/go-nsq" +) + +type Producer interface { + Publish(topic string, body []byte) error + MultiPublish(topic string, body [][]byte) error + DeferredPublish(topic string, delay time.Duration, body []byte) error +} + +var _ Producer = (*producer)(nil) + +type producer struct { + pool Pool +} + +var ( + // name pool producer + nsqList = make(map[string]Pool) +) + +type Config struct { + Addr string `toml:"addr" json:"addr"` + InitSize int `toml:"init_size" json:"init_size"` + MaxSize int `toml:"max_size" json:"max_size"` +} + +func CreateProducerPool(configs map[string]Config) { + for name, conf := range configs { + n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize) + if err == nil { + nsqList[name] = n + // 鏀寔ip:port瀵诲潃 + nsqList[conf.Addr] = n + } + } +} + +func DestroyProducerPool() { + for _, p := range nsqList { + p.Close() + } +} + +func GetProducer(key ...string) (*producer, error) { + k := "default" + if len(key) > 0 { + k = key[0] + } + if n, ok := nsqList[k]; ok { + return &producer{n}, nil + } + return nil, fmt.Errorf("GetProducer can't get producer") +} + +// CreateNSQProducer create nsq producer +func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) { + cfg := nsq.NewConfig() + for _, option := range options { + option(cfg) + } + + producer, err := nsq.NewProducer(addr, cfg) + if err != nil { + return nil, err + } + // producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError) + return producer, nil +} + +// CreateNSQProducerPool create a nwq producer pool +func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) { + factory := func() (*nsq.Producer, error) { + // TODO 杩欓噷搴旇鎵цping鏂规硶鏉ョ‘瀹氳繛鎺ユ槸姝e父鐨勫惁鍒欎笉搴旇鍒涘缓conn + return newProducer(addr, options...) + } + nsqPool, err := NewChannelPool(initSize, maxSize, factory) + if err != nil { + return nil, err + } + return nsqPool, nil +} + +func NewProducer(addr string) (*producer, error) { + CreateProducerPool(map[string]Config{"default": {addr, 1, 1}}) + return GetProducer() +} + +func retry(num int, fn func() error) error { + var err error + for i := 0; i < num; i++ { + err = fn() + if err == nil { + break + } + } + return err +} + +func (p *producer) Publish(topic string, body []byte) error { + nsq, err := p.pool.Get() + if err != nil { + return err + } + defer nsq.Close() + + return retry(2, func() error { + return nsq.Publish(topic, body) + }) +} + +func (p *producer) MultiPublish(topic string, body [][]byte) error { + nsq, err := p.pool.Get() + if err != nil { + return err + } + defer nsq.Close() + + return retry(2, func() error { + return nsq.MultiPublish(topic, body) + }) +} + +func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error { + nsq, err := p.pool.Get() + if err != nil { + return err + } + defer nsq.Close() + + return retry(2, func() error { + return nsq.DeferredPublish(topic, delay, body) + }) +} -- Gitblit v1.8.0