From 1b4fc8c66026c77abf734126b4e7662e386ec0f5 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 16 五月 2023 16:50:46 +0800 Subject: [PATCH] for go get --- nsqclient/consumer.go | 99 +++++++ nsqclient/channel_test.go | 218 +++++++++++++++ .gitignore | 2 nsqclient/pointer.go | 57 ++++ nsqclient/producer.go | 139 +++++++++ nsqclient/README.md | 54 +++ nsqclient/pool.go | 25 + TST/test/test.go | 8 main.go | 26 nsqclient/LICENSE | 20 + nsqclient/conn.go | 45 +++ nsqclient/channel.go | 134 +++++++++ 12 files changed, 810 insertions(+), 17 deletions(-) diff --git a/.gitignore b/.gitignore index c41f8dc..129af67 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,5 @@ nsqCli cnsqcli +go.sum +go.mod diff --git a/TST/test/test.go b/TST/test/test.go index 5aaf283..2e1a679 100644 --- a/TST/test/test.go +++ b/TST/test/test.go @@ -4,16 +4,16 @@ "context" "fmt" "log" - "nsqclient/nsqcli" + "nsqclient/nsqclient" "time" ) func produce(two bool) { - p, _ := nsqcli.NewProducer("192.168.20.108:4150") + p, _ := nsqclient.NewProducer("192.168.20.108:4150") var str string for len(str) < 32 { - str += "cnsqcli dynamic library" + str += "cnsqclient dynamic library" } msgx := []byte(str + "--x") msgy := []byte(str + "--y") @@ -40,7 +40,7 @@ func consume(topic, channel string) { ctx, cancel := context.WithCancel(context.Background()) - if c, e := nsqcli.NewNsqConsumer(ctx, topic, channel); e != nil { + if c, e := nsqclient.NewNsqConsumer(ctx, topic, channel); e != nil { fmt.Println("NewNsqConsumer failed", e) return } else { diff --git a/main.go b/main.go index c01a2e1..4eec6c9 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,7 @@ import ( "nsqclient/TST/test" - "nsqclient/nsqcli" + "nsqclient/nsqclient" "sync" "time" "unsafe" @@ -14,18 +14,18 @@ //export createProducer func createProducer(addr string) unsafe.Pointer { - n, _ := nsqcli.NewProducer(addr) - return nsqcli.Save(n) + n, _ := nsqclient.NewProducer(addr) + return nsqclient.Save(n) } //export destroyProducer func destroyProducer(ph unsafe.Pointer) { - nsqcli.Unref(ph) - nsqcli.DestroyProducerPool() + nsqclient.Unref(ph) + nsqclient.DestroyProducerPool() } -func pcvt(ph unsafe.Pointer) nsqcli.Producer { - return nsqcli.Restore(ph).(nsqcli.Producer) +func pcvt(ph unsafe.Pointer) nsqclient.Producer { + return nsqclient.Restore(ph).(nsqclient.Producer) } //export publish @@ -58,30 +58,30 @@ ///////////////////////////////////////////////////////////// type consumer struct { - nsqcon *nsqcli.NsqConsumer + nsqcon *nsqclient.NsqConsumer lck sync.Mutex msgs [][]byte } //export createConsumer func createConsumer(topic, channel string) unsafe.Pointer { - if c, err := nsqcli.NewNsqConsumer(nil, topic, channel); err == nil { + if c, err := nsqclient.NewNsqConsumer(nil, topic, channel); err == nil { con := &consumer{ nsqcon: c, } - return nsqcli.Save(con) + return nsqclient.Save(con) } return nil } func ccvt(ch unsafe.Pointer) *consumer { - return nsqcli.Restore(ch).(*consumer) + return nsqclient.Restore(ch).(*consumer) } //export destroyConsumer func destroyConsumer(ch unsafe.Pointer) { - nsqcli.DestroyNsqConsumer(ccvt(ch).nsqcon) - nsqcli.Unref(ch) + nsqclient.DestroyNsqConsumer(ccvt(ch).nsqcon) + nsqclient.Unref(ch) } //export Run diff --git a/nsqclient/LICENSE b/nsqclient/LICENSE new file mode 100644 index 0000000..25fdaf6 --- /dev/null +++ b/nsqclient/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2013 Fatih Arslan + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/nsqclient/README.md b/nsqclient/README.md new file mode 100644 index 0000000..432d86d --- /dev/null +++ b/nsqclient/README.md @@ -0,0 +1,54 @@ +# NSQPool + +NSQPool is a thread safe connection pool for nsq producer. It can be used to +manage and reuse nsq producer connection. + + +## Install and Usage + +Install the package with: + +```bash +github.com/qgymje/nsqpool +``` + +Import it with: + +```go +import ( + "github.com/qgymje/nsqpool" + nsq "github.com/nsqio/go-nsq" +) +``` + +and use `pool` as the package name inside the code. + +## Example + +```go +// create a factory() to be used with channel based pool +factory := func() (*nsq.Producer, error) { + config := nsq.NewConfig() + return nsq.NewProducer(":4150", config) +} + +nsqPool, err := pool.NewChannelPool(5, 30, factory) + +producer, err := nsqPool.Get() + +producer.Publish("topic", "some data") +// do something with producer and put it back to the pool by closing the connection +// (this doesn't close the underlying connection instead it's putting it back +// to the pool). +producer.Close() + +// close pool any time you want, this closes all the connections inside a pool +nsqPool.Close() + +// currently available connections in the pool +current := nsqPool.Len() +``` + +## License + +The MIT License (MIT) - see LICENSE for more details diff --git a/nsqclient/channel.go b/nsqclient/channel.go new file mode 100644 index 0000000..1594dcc --- /dev/null +++ b/nsqclient/channel.go @@ -0,0 +1,134 @@ +package nsqclient + +import ( + "errors" + "fmt" + "sync" + + nsq "github.com/nsqio/go-nsq" +) + +// channelPool implements the Pool interface based on buffered channels. +type channelPool struct { + // storage for our net.Conn connections + mu sync.Mutex + conns chan *nsq.Producer + + // net.Conn generator + factory Factory +} + +// Factory is a function to create new connections. +type Factory func() (*nsq.Producer, error) + +// NewChannelPool returns a new pool based on buffered channels with an initial +// capacity and maximum capacity. Factory is used when initial capacity is +// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool +// until a new Get() is called. During a Get(), If there is no new connection +// available in the pool, a new connection will be created via the Factory() +// method. +func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) { + if initialCap < 0 || maxCap <= 0 || initialCap > maxCap { + return nil, errors.New("invalid capacity settings") + } + + c := &channelPool{ + conns: make(chan *nsq.Producer, maxCap), + factory: factory, + } + + // create initial connections, if something goes wrong, + // just close the pool error out. + for i := 0; i < initialCap; i++ { + conn, err := factory() + if err != nil { + c.Close() + return nil, fmt.Errorf("factory is not able to fill the pool: %s", err) + } + c.conns <- conn + } + + return c, nil +} + +func (c *channelPool) getConns() chan *nsq.Producer { + c.mu.Lock() + conns := c.conns + c.mu.Unlock() + return conns +} + +// Get implements the Pool interfaces Get() method. If there is no new +// connection available in the pool, a new connection will be created via the +// Factory() method. +func (c *channelPool) Get() (*PoolConn, error) { + conns := c.getConns() + if conns == nil { + return nil, ErrClosed + } + + // wrap our connections with out custom net.Conn implementation (wrapConn + // method) that puts the connection back to the pool if it's closed. + select { + case conn := <-conns: + if conn == nil { + return nil, ErrClosed + } + + return c.wrapConn(conn), nil + default: + conn, err := c.factory() + if err != nil { + return nil, err + } + + return c.wrapConn(conn), nil + } +} + +// put puts the connection back to the pool. If the pool is full or closed, +// conn is simply closed. A nil conn will be rejected. +func (c *channelPool) put(conn *nsq.Producer) error { + if conn == nil { + return errors.New("connection is nil. rejecting") + } + + c.mu.Lock() + defer c.mu.Unlock() + + if c.conns == nil { + // pool is closed, close passed connection + conn.Stop() + return nil + } + + // put the resource back into the pool. If the pool is full, this will + // block and the default case will be executed. + select { + case c.conns <- conn: + return nil + default: + // pool is full, close passed connection + conn.Stop() + return nil + } +} + +func (c *channelPool) Close() { + c.mu.Lock() + conns := c.conns + c.conns = nil + c.factory = nil + c.mu.Unlock() + + if conns == nil { + return + } + + close(conns) + for conn := range conns { + conn.Stop() + } +} + +func (c *channelPool) Len() int { return len(c.getConns()) } diff --git a/nsqclient/channel_test.go b/nsqclient/channel_test.go new file mode 100644 index 0000000..725db9b --- /dev/null +++ b/nsqclient/channel_test.go @@ -0,0 +1,218 @@ +package nsqclient + +import ( + "math/rand" + "sync" + "testing" + "time" + + "github.com/nsqio/go-nsq" +) + +var ( + InitialCap = 2 + MaximumCap = 10 + factory = func() (*nsq.Producer, error) { + config := nsq.NewConfig() + return nsq.NewProducer(":4160", config) + } +) + +func newChannelPool() (Pool, error) { + return NewChannelPool(InitialCap, MaximumCap, factory) +} + +func TestNew(t *testing.T) { + _, err := newChannelPool() + if err != nil { + t.Errorf("New error: %s", err) + } +} + +func TestPool_Get(t *testing.T) { + p, _ := newChannelPool() + defer p.Close() + + _, err := p.Get() + if err != nil { + t.Errorf("Get error: %s", err) + } + + // after one get, current capacity should be lowered by one. + if p.Len() != (InitialCap - 1) { + t.Errorf("Get error. Expecting %d, got %d", + (InitialCap - 1), p.Len()) + } + + // get them all + var wg sync.WaitGroup + for i := 0; i < (InitialCap - 1); i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, err := p.Get() + if err != nil { + t.Errorf("Get error: %s", err) + } + }() + } + wg.Wait() + + if p.Len() != 0 { + t.Errorf("Get error. Expecting %d, got %d", + (InitialCap - 1), p.Len()) + } + + _, err = p.Get() + if err != nil { + t.Errorf("Get error: %s", err) + } +} + +func TestPool_Put(t *testing.T) { + p, err := NewChannelPool(0, 30, factory) + if err != nil { + t.Fatal(err) + } + defer p.Close() + + // get/create from the pool + conns := make([]*PoolConn, MaximumCap) + for i := 0; i < MaximumCap; i++ { + conn, _ := p.Get() + conns[i] = conn + } + + // now put them all back + for _, conn := range conns { + conn.Close() + } + + if p.Len() != MaximumCap { + t.Errorf("Put error len. Expecting %d, got %d", + 1, p.Len()) + } + + conn, _ := p.Get() + p.Close() // close pool + + conn.Close() // try to put into a full pool + if p.Len() != 0 { + t.Errorf("Put error. Closed pool shouldn't allow to put connections.") + } +} + +func TestPool_PutUnusableConn(t *testing.T) { + p, _ := newChannelPool() + defer p.Close() + + // ensure pool is not empty + conn, _ := p.Get() + conn.Close() + + poolSize := p.Len() + conn, _ = p.Get() + conn.Close() + if p.Len() != poolSize { + t.Errorf("Pool size is expected to be equal to initial size") + } + + conn, _ = p.Get() + conn.MarkUnusable() + + conn.Close() + if p.Len() != poolSize-1 { + t.Errorf("Pool size is expected to be initial_size - 1 [%d:%d]", p.Len(), poolSize-1) + } +} + +func TestPool_UsedCapacity(t *testing.T) { + p, _ := newChannelPool() + defer p.Close() + + if p.Len() != InitialCap { + t.Errorf("InitialCap error. Expecting %d, got %d", + InitialCap, p.Len()) + } +} + +func TestPool_Close(t *testing.T) { + p, _ := newChannelPool() + + // now close it and test all cases we are expecting. + p.Close() + + c := p.(*channelPool) + + if c.conns != nil { + t.Errorf("Close error, conns channel should be nil") + } + + if c.factory != nil { + t.Errorf("Close error, factory should be nil") + } + + _, err := p.Get() + if err == nil { + t.Errorf("Close error, get conn should return an error") + } + + if p.Len() != 0 { + t.Errorf("Close error used capacity. Expecting 0, got %d", p.Len()) + } +} + +func TestPoolConcurrent(t *testing.T) { + p, _ := newChannelPool() + pipe := make(chan *PoolConn, 0) + + go func() { + p.Close() + }() + + for i := 0; i < MaximumCap; i++ { + go func() { + conn, _ := p.Get() + + pipe <- conn + }() + + go func() { + conn := <-pipe + if conn == nil { + return + } + conn.Close() + }() + } +} + +func TestPoolConcurrent2(t *testing.T) { + p, _ := NewChannelPool(0, 30, factory) + + var wg sync.WaitGroup + + go func() { + for i := 0; i < 10; i++ { + wg.Add(1) + go func(i int) { + conn, _ := p.Get() + time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) + conn.Close() + wg.Done() + }(i) + } + }() + + for i := 0; i < 10; i++ { + wg.Add(1) + go func(i int) { + conn, _ := p.Get() + time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) + conn.Close() + wg.Done() + }(i) + } + + wg.Wait() +} diff --git a/nsqclient/conn.go b/nsqclient/conn.go new file mode 100644 index 0000000..5c680b5 --- /dev/null +++ b/nsqclient/conn.go @@ -0,0 +1,45 @@ +package nsqclient + +import ( + "sync" + + nsq "github.com/nsqio/go-nsq" +) + +// PoolConn is a wrapper around net.Conn to modify the the behavior of +// net.Conn's Close() method. +type PoolConn struct { + *nsq.Producer + mu sync.RWMutex + c *channelPool + unusable bool +} + +// Close puts the given connects back to the pool instead of closing it. +func (p *PoolConn) Close() error { + p.mu.RLock() + defer p.mu.RUnlock() + + if p.unusable { + if p.Producer != nil { + p.Producer.Stop() + return nil + } + return nil + } + return p.c.put(p.Producer) +} + +// MarkUnusable marks the connection not usable any more, to let the pool close it instead of returning it to pool. +func (p *PoolConn) MarkUnusable() { + p.mu.Lock() + p.unusable = true + p.mu.Unlock() +} + +// newConn wraps a standard net.Conn to a poolConn net.Conn. +func (c *channelPool) wrapConn(conn *nsq.Producer) *PoolConn { + p := &PoolConn{c: c} + p.Producer = conn + return p +} diff --git a/nsqclient/consumer.go b/nsqclient/consumer.go new file mode 100644 index 0000000..a0df0b0 --- /dev/null +++ b/nsqclient/consumer.go @@ -0,0 +1,99 @@ +package nsqclient + +import ( + "context" + "fmt" + "time" + + nsq "github.com/nsqio/go-nsq" +) + +type NsqConsumer struct { + consumer *nsq.Consumer + // handler nsq.Handler + handler func([]byte) error + ctx context.Context + ctxCancel context.CancelFunc + topic string + channel string +} + +func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) { + conf := nsq.NewConfig() + conf.MaxAttempts = 0 + conf.MsgTimeout = 10 * time.Minute // 榛樿涓�涓秷鎭渶澶氳兘澶勭悊鍗佸垎閽燂紝鍚﹀垯灏变細閲嶆柊涓㈠叆闃熷垪 + conf.LookupdPollInterval = 3 * time.Second // 璋冩暣consumer鐨勯噸杩為棿闅旀椂闂翠负3绉� + for _, option := range options { + option(conf) + } + + consumer, err := nsq.NewConsumer(topic, channel, conf) + if err != nil { + return nil, err + } + return &NsqConsumer{ + consumer: consumer, + ctx: ctx, + topic: topic, + channel: channel, + }, nil +} + +func DestroyNsqConsumer(c *NsqConsumer) { + if c != nil { + if c.ctxCancel != nil { + c.ctxCancel() + } + } +} + +// func (n *NsqConsumer) AddHandler(handler nsq.Handler) { +// n.handler = handler +// } + +func (n *NsqConsumer) AddHandler(handler func([]byte) error) { + n.handler = handler +} + +func (n *NsqConsumer) Run(qaddr string, concurrency int) error { + return n.RunDistributed([]string{qaddr}, nil, concurrency) +} + +func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error { + return n.RunDistributed(nil, []string{lookupAddr}, concurrency) +} + +func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error { + n.consumer.ChangeMaxInFlight(concurrency) + // n.consumer.AddConcurrentHandlers(n.handler, concurrency) + n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error { + return n.handler(msg.Body) + // return nil + }), concurrency) + + var err error + if len(qAddr) > 0 { + err = n.consumer.ConnectToNSQDs(qAddr) + } else if len(lAddr) > 0 { + err = n.consumer.ConnectToNSQLookupds(lAddr) + } else { + err = fmt.Errorf("Addr Must NOT Empty") + } + if err != nil { + return err + } + + if n.ctx == nil { + n.ctx, n.ctxCancel = context.WithCancel(context.Background()) + } + + for { + select { + case <-n.ctx.Done(): + fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel) + n.consumer.Stop() + fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel) + return nil + } + } +} diff --git a/nsqclient/pointer.go b/nsqclient/pointer.go new file mode 100644 index 0000000..1cba795 --- /dev/null +++ b/nsqclient/pointer.go @@ -0,0 +1,57 @@ +package nsqclient + +// #include <stdlib.h> +import "C" +import ( + "sync" + "unsafe" +) + +var ( + mutex sync.RWMutex + store = map[unsafe.Pointer]interface{}{} +) + +func Save(v interface{}) unsafe.Pointer { + if v == nil { + return nil + } + + // Generate real fake C pointer. + // This pointer will not store any data, but will bi used for indexing purposes. + // Since Go doest allow to cast dangling pointer to unsafe.Pointer, we do rally allocate one byte. + // Why we need indexing, because Go doest allow C code to store pointers to Go data. + var ptr unsafe.Pointer = C.malloc(C.size_t(1)) + if ptr == nil { + panic("can't allocate 'cgo-pointer hack index pointer': ptr == nil") + } + + mutex.Lock() + store[ptr] = v + mutex.Unlock() + + return ptr +} + +func Restore(ptr unsafe.Pointer) (v interface{}) { + if ptr == nil { + return nil + } + + mutex.RLock() + v = store[ptr] + mutex.RUnlock() + return +} + +func Unref(ptr unsafe.Pointer) { + if ptr == nil { + return + } + + mutex.Lock() + delete(store, ptr) + mutex.Unlock() + + C.free(ptr) +} diff --git a/nsqclient/pool.go b/nsqclient/pool.go new file mode 100644 index 0000000..b773490 --- /dev/null +++ b/nsqclient/pool.go @@ -0,0 +1,25 @@ +// Package pool implements a pool of net.Conn interfaces to manage and reuse them. +package nsqclient + +import "errors" + +var ( + // ErrClosed is the error resulting if the pool is closed via pool.Close(). + ErrClosed = errors.New("pool is closed") +) + +// Pool interface describes a pool implementation. A pool should have maximum +// capacity. An ideal pool is threadsafe and easy to use. +type Pool interface { + // Get returns a new connection from the pool. Closing the connections puts + // it back to the Pool. Closing it when the pool is destroyed or full will + // be counted as an error. + Get() (*PoolConn, error) + + // Close closes the pool and all its connections. After Close() the pool is + // no longer usable. + Close() + + // Len returns the current number of connections of the pool. + Len() int +} diff --git a/nsqclient/producer.go b/nsqclient/producer.go new file mode 100644 index 0000000..717c7a1 --- /dev/null +++ b/nsqclient/producer.go @@ -0,0 +1,139 @@ +package nsqclient + +import ( + "fmt" + "time" + + nsq "github.com/nsqio/go-nsq" +) + +type Producer interface { + Publish(topic string, body []byte) error + MultiPublish(topic string, body [][]byte) error + DeferredPublish(topic string, delay time.Duration, body []byte) error +} + +var _ Producer = (*producer)(nil) + +type producer struct { + pool Pool +} + +var ( + // name pool producer + nsqList = make(map[string]Pool) +) + +type Config struct { + Addr string `toml:"addr" json:"addr"` + InitSize int `toml:"init_size" json:"init_size"` + MaxSize int `toml:"max_size" json:"max_size"` +} + +func CreateProducerPool(configs map[string]Config) { + for name, conf := range configs { + n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize) + if err == nil { + nsqList[name] = n + // 鏀寔ip:port瀵诲潃 + nsqList[conf.Addr] = n + } + } +} + +func DestroyProducerPool() { + for _, p := range nsqList { + p.Close() + } +} + +func GetProducer(key ...string) (*producer, error) { + k := "default" + if len(key) > 0 { + k = key[0] + } + if n, ok := nsqList[k]; ok { + return &producer{n}, nil + } + return nil, fmt.Errorf("GetProducer can't get producer") +} + +// CreateNSQProducer create nsq producer +func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) { + cfg := nsq.NewConfig() + for _, option := range options { + option(cfg) + } + + producer, err := nsq.NewProducer(addr, cfg) + if err != nil { + return nil, err + } + // producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError) + return producer, nil +} + +// CreateNSQProducerPool create a nwq producer pool +func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) { + factory := func() (*nsq.Producer, error) { + // TODO 杩欓噷搴旇鎵цping鏂规硶鏉ョ‘瀹氳繛鎺ユ槸姝e父鐨勫惁鍒欎笉搴旇鍒涘缓conn + return newProducer(addr, options...) + } + nsqPool, err := NewChannelPool(initSize, maxSize, factory) + if err != nil { + return nil, err + } + return nsqPool, nil +} + +func NewProducer(addr string) (*producer, error) { + CreateProducerPool(map[string]Config{"default": {addr, 1, 1}}) + return GetProducer() +} + +func retry(num int, fn func() error) error { + var err error + for i := 0; i < num; i++ { + err = fn() + if err == nil { + break + } + } + return err +} + +func (p *producer) Publish(topic string, body []byte) error { + nsq, err := p.pool.Get() + if err != nil { + return err + } + defer nsq.Close() + + return retry(2, func() error { + return nsq.Publish(topic, body) + }) +} + +func (p *producer) MultiPublish(topic string, body [][]byte) error { + nsq, err := p.pool.Get() + if err != nil { + return err + } + defer nsq.Close() + + return retry(2, func() error { + return nsq.MultiPublish(topic, body) + }) +} + +func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error { + nsq, err := p.pool.Get() + if err != nil { + return err + } + defer nsq.Close() + + return retry(2, func() error { + return nsq.DeferredPublish(topic, delay, body) + }) +} -- Gitblit v1.8.0