zhangmeng
2023-05-16 1b4fc8c66026c77abf734126b4e7662e386ec0f5
for go get
9个文件已添加
3个文件已修改
827 ■■■■■ 已修改文件
.gitignore 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
TST/test/test.go 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/LICENSE 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/README.md 54 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/channel.go 134 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/channel_test.go 218 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/conn.go 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/consumer.go 99 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/pointer.go 57 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/pool.go 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/producer.go 139 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -24,3 +24,5 @@
nsqCli
cnsqcli
go.sum
go.mod
TST/test/test.go
@@ -4,16 +4,16 @@
    "context"
    "fmt"
    "log"
    "nsqclient/nsqcli"
    "nsqclient/nsqclient"
    "time"
)
func produce(two bool) {
    p, _ := nsqcli.NewProducer("192.168.20.108:4150")
    p, _ := nsqclient.NewProducer("192.168.20.108:4150")
    var str string
    for len(str) < 32 {
        str += "cnsqcli dynamic library"
        str += "cnsqclient dynamic library"
    }
    msgx := []byte(str + "--x")
    msgy := []byte(str + "--y")
@@ -40,7 +40,7 @@
func consume(topic, channel string) {
    ctx, cancel := context.WithCancel(context.Background())
    if c, e := nsqcli.NewNsqConsumer(ctx, topic, channel); e != nil {
    if c, e := nsqclient.NewNsqConsumer(ctx, topic, channel); e != nil {
        fmt.Println("NewNsqConsumer failed", e)
        return
    } else {
main.go
@@ -6,7 +6,7 @@
import (
    "nsqclient/TST/test"
    "nsqclient/nsqcli"
    "nsqclient/nsqclient"
    "sync"
    "time"
    "unsafe"
@@ -14,18 +14,18 @@
//export createProducer
func createProducer(addr string) unsafe.Pointer {
    n, _ := nsqcli.NewProducer(addr)
    return nsqcli.Save(n)
    n, _ := nsqclient.NewProducer(addr)
    return nsqclient.Save(n)
}
//export destroyProducer
func destroyProducer(ph unsafe.Pointer) {
    nsqcli.Unref(ph)
    nsqcli.DestroyProducerPool()
    nsqclient.Unref(ph)
    nsqclient.DestroyProducerPool()
}
func pcvt(ph unsafe.Pointer) nsqcli.Producer {
    return nsqcli.Restore(ph).(nsqcli.Producer)
func pcvt(ph unsafe.Pointer) nsqclient.Producer {
    return nsqclient.Restore(ph).(nsqclient.Producer)
}
//export publish
@@ -58,30 +58,30 @@
/////////////////////////////////////////////////////////////
type consumer struct {
    nsqcon *nsqcli.NsqConsumer
    nsqcon *nsqclient.NsqConsumer
    lck    sync.Mutex
    msgs   [][]byte
}
//export createConsumer
func createConsumer(topic, channel string) unsafe.Pointer {
    if c, err := nsqcli.NewNsqConsumer(nil, topic, channel); err == nil {
    if c, err := nsqclient.NewNsqConsumer(nil, topic, channel); err == nil {
        con := &consumer{
            nsqcon: c,
        }
        return nsqcli.Save(con)
        return nsqclient.Save(con)
    }
    return nil
}
func ccvt(ch unsafe.Pointer) *consumer {
    return nsqcli.Restore(ch).(*consumer)
    return nsqclient.Restore(ch).(*consumer)
}
//export destroyConsumer
func destroyConsumer(ch unsafe.Pointer) {
    nsqcli.DestroyNsqConsumer(ccvt(ch).nsqcon)
    nsqcli.Unref(ch)
    nsqclient.DestroyNsqConsumer(ccvt(ch).nsqcon)
    nsqclient.Unref(ch)
}
//export Run
nsqclient/LICENSE
New file
@@ -0,0 +1,20 @@
The MIT License (MIT)
Copyright (c) 2013 Fatih Arslan
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
nsqclient/README.md
New file
@@ -0,0 +1,54 @@
# NSQPool
NSQPool is a thread safe connection pool for nsq producer. It can be used to
manage and reuse nsq producer connection.
## Install and Usage
Install the package with:
```bash
github.com/qgymje/nsqpool
```
Import it with:
```go
import (
    "github.com/qgymje/nsqpool"
    nsq "github.com/nsqio/go-nsq"
)
```
and use `pool` as the package name inside the code.
## Example
```go
// create a factory() to be used with channel based pool
factory := func() (*nsq.Producer, error) {
    config := nsq.NewConfig()
    return nsq.NewProducer(":4150", config)
}
nsqPool, err := pool.NewChannelPool(5, 30, factory)
producer, err := nsqPool.Get()
producer.Publish("topic", "some data")
// do something with producer and put it back to the pool by closing the connection
// (this doesn't close the underlying connection instead it's putting it back
// to the pool).
producer.Close()
// close pool any time you want, this closes all the connections inside a pool
nsqPool.Close()
// currently available connections in the pool
current := nsqPool.Len()
```
## License
The MIT License (MIT) - see LICENSE for more details
nsqclient/channel.go
New file
@@ -0,0 +1,134 @@
package nsqclient
import (
    "errors"
    "fmt"
    "sync"
    nsq "github.com/nsqio/go-nsq"
)
// channelPool implements the Pool interface based on buffered channels.
type channelPool struct {
    // storage for our net.Conn connections
    mu    sync.Mutex
    conns chan *nsq.Producer
    // net.Conn generator
    factory Factory
}
// Factory is a function to create new connections.
type Factory func() (*nsq.Producer, error)
// NewChannelPool returns a new pool based on buffered channels with an initial
// capacity and maximum capacity. Factory is used when initial capacity is
// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool
// until a new Get() is called. During a Get(), If there is no new connection
// available in the pool, a new connection will be created via the Factory()
// method.
func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) {
    if initialCap < 0 || maxCap <= 0 || initialCap > maxCap {
        return nil, errors.New("invalid capacity settings")
    }
    c := &channelPool{
        conns:   make(chan *nsq.Producer, maxCap),
        factory: factory,
    }
    // create initial connections, if something goes wrong,
    // just close the pool error out.
    for i := 0; i < initialCap; i++ {
        conn, err := factory()
        if err != nil {
            c.Close()
            return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
        }
        c.conns <- conn
    }
    return c, nil
}
func (c *channelPool) getConns() chan *nsq.Producer {
    c.mu.Lock()
    conns := c.conns
    c.mu.Unlock()
    return conns
}
// Get implements the Pool interfaces Get() method. If there is no new
// connection available in the pool, a new connection will be created via the
// Factory() method.
func (c *channelPool) Get() (*PoolConn, error) {
    conns := c.getConns()
    if conns == nil {
        return nil, ErrClosed
    }
    // wrap our connections with out custom net.Conn implementation (wrapConn
    // method) that puts the connection back to the pool if it's closed.
    select {
    case conn := <-conns:
        if conn == nil {
            return nil, ErrClosed
        }
        return c.wrapConn(conn), nil
    default:
        conn, err := c.factory()
        if err != nil {
            return nil, err
        }
        return c.wrapConn(conn), nil
    }
}
// put puts the connection back to the pool. If the pool is full or closed,
// conn is simply closed. A nil conn will be rejected.
func (c *channelPool) put(conn *nsq.Producer) error {
    if conn == nil {
        return errors.New("connection is nil. rejecting")
    }
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.conns == nil {
        // pool is closed, close passed connection
        conn.Stop()
        return nil
    }
    // put the resource back into the pool. If the pool is full, this will
    // block and the default case will be executed.
    select {
    case c.conns <- conn:
        return nil
    default:
        // pool is full, close passed connection
        conn.Stop()
        return nil
    }
}
func (c *channelPool) Close() {
    c.mu.Lock()
    conns := c.conns
    c.conns = nil
    c.factory = nil
    c.mu.Unlock()
    if conns == nil {
        return
    }
    close(conns)
    for conn := range conns {
        conn.Stop()
    }
}
func (c *channelPool) Len() int { return len(c.getConns()) }
nsqclient/channel_test.go
New file
@@ -0,0 +1,218 @@
package nsqclient
import (
    "math/rand"
    "sync"
    "testing"
    "time"
    "github.com/nsqio/go-nsq"
)
var (
    InitialCap = 2
    MaximumCap = 10
    factory    = func() (*nsq.Producer, error) {
        config := nsq.NewConfig()
        return nsq.NewProducer(":4160", config)
    }
)
func newChannelPool() (Pool, error) {
    return NewChannelPool(InitialCap, MaximumCap, factory)
}
func TestNew(t *testing.T) {
    _, err := newChannelPool()
    if err != nil {
        t.Errorf("New error: %s", err)
    }
}
func TestPool_Get(t *testing.T) {
    p, _ := newChannelPool()
    defer p.Close()
    _, err := p.Get()
    if err != nil {
        t.Errorf("Get error: %s", err)
    }
    // after one get, current capacity should be lowered by one.
    if p.Len() != (InitialCap - 1) {
        t.Errorf("Get error. Expecting %d, got %d",
            (InitialCap - 1), p.Len())
    }
    // get them all
    var wg sync.WaitGroup
    for i := 0; i < (InitialCap - 1); i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            _, err := p.Get()
            if err != nil {
                t.Errorf("Get error: %s", err)
            }
        }()
    }
    wg.Wait()
    if p.Len() != 0 {
        t.Errorf("Get error. Expecting %d, got %d",
            (InitialCap - 1), p.Len())
    }
    _, err = p.Get()
    if err != nil {
        t.Errorf("Get error: %s", err)
    }
}
func TestPool_Put(t *testing.T) {
    p, err := NewChannelPool(0, 30, factory)
    if err != nil {
        t.Fatal(err)
    }
    defer p.Close()
    // get/create from the pool
    conns := make([]*PoolConn, MaximumCap)
    for i := 0; i < MaximumCap; i++ {
        conn, _ := p.Get()
        conns[i] = conn
    }
    // now put them all back
    for _, conn := range conns {
        conn.Close()
    }
    if p.Len() != MaximumCap {
        t.Errorf("Put error len. Expecting %d, got %d",
            1, p.Len())
    }
    conn, _ := p.Get()
    p.Close() // close pool
    conn.Close() // try to put into a full pool
    if p.Len() != 0 {
        t.Errorf("Put error. Closed pool shouldn't allow to put connections.")
    }
}
func TestPool_PutUnusableConn(t *testing.T) {
    p, _ := newChannelPool()
    defer p.Close()
    // ensure pool is not empty
    conn, _ := p.Get()
    conn.Close()
    poolSize := p.Len()
    conn, _ = p.Get()
    conn.Close()
    if p.Len() != poolSize {
        t.Errorf("Pool size is expected to be equal to initial size")
    }
    conn, _ = p.Get()
    conn.MarkUnusable()
    conn.Close()
    if p.Len() != poolSize-1 {
        t.Errorf("Pool size is expected to be initial_size - 1 [%d:%d]", p.Len(), poolSize-1)
    }
}
func TestPool_UsedCapacity(t *testing.T) {
    p, _ := newChannelPool()
    defer p.Close()
    if p.Len() != InitialCap {
        t.Errorf("InitialCap error. Expecting %d, got %d",
            InitialCap, p.Len())
    }
}
func TestPool_Close(t *testing.T) {
    p, _ := newChannelPool()
    // now close it and test all cases we are expecting.
    p.Close()
    c := p.(*channelPool)
    if c.conns != nil {
        t.Errorf("Close error, conns channel should be nil")
    }
    if c.factory != nil {
        t.Errorf("Close error, factory should be nil")
    }
    _, err := p.Get()
    if err == nil {
        t.Errorf("Close error, get conn should return an error")
    }
    if p.Len() != 0 {
        t.Errorf("Close error used capacity. Expecting 0, got %d", p.Len())
    }
}
func TestPoolConcurrent(t *testing.T) {
    p, _ := newChannelPool()
    pipe := make(chan *PoolConn, 0)
    go func() {
        p.Close()
    }()
    for i := 0; i < MaximumCap; i++ {
        go func() {
            conn, _ := p.Get()
            pipe <- conn
        }()
        go func() {
            conn := <-pipe
            if conn == nil {
                return
            }
            conn.Close()
        }()
    }
}
func TestPoolConcurrent2(t *testing.T) {
    p, _ := NewChannelPool(0, 30, factory)
    var wg sync.WaitGroup
    go func() {
        for i := 0; i < 10; i++ {
            wg.Add(1)
            go func(i int) {
                conn, _ := p.Get()
                time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
                conn.Close()
                wg.Done()
            }(i)
        }
    }()
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            conn, _ := p.Get()
            time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
            conn.Close()
            wg.Done()
        }(i)
    }
    wg.Wait()
}
nsqclient/conn.go
New file
@@ -0,0 +1,45 @@
package nsqclient
import (
    "sync"
    nsq "github.com/nsqio/go-nsq"
)
// PoolConn is a wrapper around net.Conn to modify the the behavior of
// net.Conn's Close() method.
type PoolConn struct {
    *nsq.Producer
    mu       sync.RWMutex
    c        *channelPool
    unusable bool
}
// Close puts the given connects back to the pool instead of closing it.
func (p *PoolConn) Close() error {
    p.mu.RLock()
    defer p.mu.RUnlock()
    if p.unusable {
        if p.Producer != nil {
            p.Producer.Stop()
            return nil
        }
        return nil
    }
    return p.c.put(p.Producer)
}
// MarkUnusable marks the connection not usable any more, to let the pool close it instead of returning it to pool.
func (p *PoolConn) MarkUnusable() {
    p.mu.Lock()
    p.unusable = true
    p.mu.Unlock()
}
// newConn wraps a standard net.Conn to a poolConn net.Conn.
func (c *channelPool) wrapConn(conn *nsq.Producer) *PoolConn {
    p := &PoolConn{c: c}
    p.Producer = conn
    return p
}
nsqclient/consumer.go
New file
@@ -0,0 +1,99 @@
package nsqclient
import (
    "context"
    "fmt"
    "time"
    nsq "github.com/nsqio/go-nsq"
)
type NsqConsumer struct {
    consumer *nsq.Consumer
    // handler   nsq.Handler
    handler   func([]byte) error
    ctx       context.Context
    ctxCancel context.CancelFunc
    topic     string
    channel   string
}
func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
    conf := nsq.NewConfig()
    conf.MaxAttempts = 0
    conf.MsgTimeout = 10 * time.Minute         // 默认一个消息最多能处理十分钟,否则就会重新丢入队列
    conf.LookupdPollInterval = 3 * time.Second // 调整consumer的重连间隔时间为3秒
    for _, option := range options {
        option(conf)
    }
    consumer, err := nsq.NewConsumer(topic, channel, conf)
    if err != nil {
        return nil, err
    }
    return &NsqConsumer{
        consumer: consumer,
        ctx:      ctx,
        topic:    topic,
        channel:  channel,
    }, nil
}
func DestroyNsqConsumer(c *NsqConsumer) {
    if c != nil {
        if c.ctxCancel != nil {
            c.ctxCancel()
        }
    }
}
// func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
//     n.handler = handler
// }
func (n *NsqConsumer) AddHandler(handler func([]byte) error) {
    n.handler = handler
}
func (n *NsqConsumer) Run(qaddr string, concurrency int) error {
    return n.RunDistributed([]string{qaddr}, nil, concurrency)
}
func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error {
    return n.RunDistributed(nil, []string{lookupAddr}, concurrency)
}
func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error {
    n.consumer.ChangeMaxInFlight(concurrency)
    // n.consumer.AddConcurrentHandlers(n.handler, concurrency)
    n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error {
        return n.handler(msg.Body)
        // return nil
    }), concurrency)
    var err error
    if len(qAddr) > 0 {
        err = n.consumer.ConnectToNSQDs(qAddr)
    } else if len(lAddr) > 0 {
        err = n.consumer.ConnectToNSQLookupds(lAddr)
    } else {
        err = fmt.Errorf("Addr Must NOT Empty")
    }
    if err != nil {
        return err
    }
    if n.ctx == nil {
        n.ctx, n.ctxCancel = context.WithCancel(context.Background())
    }
    for {
        select {
        case <-n.ctx.Done():
            fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel)
            n.consumer.Stop()
            fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel)
            return nil
        }
    }
}
nsqclient/pointer.go
New file
@@ -0,0 +1,57 @@
package nsqclient
// #include <stdlib.h>
import "C"
import (
    "sync"
    "unsafe"
)
var (
    mutex sync.RWMutex
    store = map[unsafe.Pointer]interface{}{}
)
func Save(v interface{}) unsafe.Pointer {
    if v == nil {
        return nil
    }
    // Generate real fake C pointer.
    // This pointer will not store any data, but will bi used for indexing purposes.
    // Since Go doest allow to cast dangling pointer to unsafe.Pointer, we do rally allocate one byte.
    // Why we need indexing, because Go doest allow C code to store pointers to Go data.
    var ptr unsafe.Pointer = C.malloc(C.size_t(1))
    if ptr == nil {
        panic("can't allocate 'cgo-pointer hack index pointer': ptr == nil")
    }
    mutex.Lock()
    store[ptr] = v
    mutex.Unlock()
    return ptr
}
func Restore(ptr unsafe.Pointer) (v interface{}) {
    if ptr == nil {
        return nil
    }
    mutex.RLock()
    v = store[ptr]
    mutex.RUnlock()
    return
}
func Unref(ptr unsafe.Pointer) {
    if ptr == nil {
        return
    }
    mutex.Lock()
    delete(store, ptr)
    mutex.Unlock()
    C.free(ptr)
}
nsqclient/pool.go
New file
@@ -0,0 +1,25 @@
// Package pool implements a pool of net.Conn interfaces to manage and reuse them.
package nsqclient
import "errors"
var (
    // ErrClosed is the error resulting if the pool is closed via pool.Close().
    ErrClosed = errors.New("pool is closed")
)
// Pool interface describes a pool implementation. A pool should have maximum
// capacity. An ideal pool is threadsafe and easy to use.
type Pool interface {
    // Get returns a new connection from the pool. Closing the connections puts
    // it back to the Pool. Closing it when the pool is destroyed or full will
    // be counted as an error.
    Get() (*PoolConn, error)
    // Close closes the pool and all its connections. After Close() the pool is
    // no longer usable.
    Close()
    // Len returns the current number of connections of the pool.
    Len() int
}
nsqclient/producer.go
New file
@@ -0,0 +1,139 @@
package nsqclient
import (
    "fmt"
    "time"
    nsq "github.com/nsqio/go-nsq"
)
type Producer interface {
    Publish(topic string, body []byte) error
    MultiPublish(topic string, body [][]byte) error
    DeferredPublish(topic string, delay time.Duration, body []byte) error
}
var _ Producer = (*producer)(nil)
type producer struct {
    pool Pool
}
var (
    //                    name   pool producer
    nsqList = make(map[string]Pool)
)
type Config struct {
    Addr     string `toml:"addr" json:"addr"`
    InitSize int    `toml:"init_size" json:"init_size"`
    MaxSize  int    `toml:"max_size" json:"max_size"`
}
func CreateProducerPool(configs map[string]Config) {
    for name, conf := range configs {
        n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize)
        if err == nil {
            nsqList[name] = n
            // 支持ip:port寻址
            nsqList[conf.Addr] = n
        }
    }
}
func DestroyProducerPool() {
    for _, p := range nsqList {
        p.Close()
    }
}
func GetProducer(key ...string) (*producer, error) {
    k := "default"
    if len(key) > 0 {
        k = key[0]
    }
    if n, ok := nsqList[k]; ok {
        return &producer{n}, nil
    }
    return nil, fmt.Errorf("GetProducer can't get producer")
}
// CreateNSQProducer create nsq producer
func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) {
    cfg := nsq.NewConfig()
    for _, option := range options {
        option(cfg)
    }
    producer, err := nsq.NewProducer(addr, cfg)
    if err != nil {
        return nil, err
    }
    // producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError)
    return producer, nil
}
// CreateNSQProducerPool create a nwq producer pool
func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) {
    factory := func() (*nsq.Producer, error) {
        // TODO 这里应该执行ping方法来确定连接是正常的否则不应该创建conn
        return newProducer(addr, options...)
    }
    nsqPool, err := NewChannelPool(initSize, maxSize, factory)
    if err != nil {
        return nil, err
    }
    return nsqPool, nil
}
func NewProducer(addr string) (*producer, error) {
    CreateProducerPool(map[string]Config{"default": {addr, 1, 1}})
    return GetProducer()
}
func retry(num int, fn func() error) error {
    var err error
    for i := 0; i < num; i++ {
        err = fn()
        if err == nil {
            break
        }
    }
    return err
}
func (p *producer) Publish(topic string, body []byte) error {
    nsq, err := p.pool.Get()
    if err != nil {
        return err
    }
    defer nsq.Close()
    return retry(2, func() error {
        return nsq.Publish(topic, body)
    })
}
func (p *producer) MultiPublish(topic string, body [][]byte) error {
    nsq, err := p.pool.Get()
    if err != nil {
        return err
    }
    defer nsq.Close()
    return retry(2, func() error {
        return nsq.MultiPublish(topic, body)
    })
}
func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error {
    nsq, err := p.pool.Get()
    if err != nil {
        return err
    }
    defer nsq.Close()
    return retry(2, func() error {
        return nsq.DeferredPublish(topic, delay, body)
    })
}