From 1b4fc8c66026c77abf734126b4e7662e386ec0f5 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 16 五月 2023 16:50:46 +0800
Subject: [PATCH] for go get

---
 nsqclient/consumer.go     |   99 +++++++
 nsqclient/channel_test.go |  218 +++++++++++++++
 .gitignore                |    2 
 nsqclient/pointer.go      |   57 ++++
 nsqclient/producer.go     |  139 +++++++++
 nsqclient/README.md       |   54 +++
 nsqclient/pool.go         |   25 +
 TST/test/test.go          |    8 
 main.go                   |   26 
 nsqclient/LICENSE         |   20 +
 nsqclient/conn.go         |   45 +++
 nsqclient/channel.go      |  134 +++++++++
 12 files changed, 810 insertions(+), 17 deletions(-)

diff --git a/.gitignore b/.gitignore
index c41f8dc..129af67 100644
--- a/.gitignore
+++ b/.gitignore
@@ -24,3 +24,5 @@
 
 nsqCli
 cnsqcli
+go.sum
+go.mod
diff --git a/TST/test/test.go b/TST/test/test.go
index 5aaf283..2e1a679 100644
--- a/TST/test/test.go
+++ b/TST/test/test.go
@@ -4,16 +4,16 @@
 	"context"
 	"fmt"
 	"log"
-	"nsqclient/nsqcli"
+	"nsqclient/nsqclient"
 	"time"
 )
 
 func produce(two bool) {
-	p, _ := nsqcli.NewProducer("192.168.20.108:4150")
+	p, _ := nsqclient.NewProducer("192.168.20.108:4150")
 
 	var str string
 	for len(str) < 32 {
-		str += "cnsqcli dynamic library"
+		str += "cnsqclient dynamic library"
 	}
 	msgx := []byte(str + "--x")
 	msgy := []byte(str + "--y")
@@ -40,7 +40,7 @@
 func consume(topic, channel string) {
 	ctx, cancel := context.WithCancel(context.Background())
 
-	if c, e := nsqcli.NewNsqConsumer(ctx, topic, channel); e != nil {
+	if c, e := nsqclient.NewNsqConsumer(ctx, topic, channel); e != nil {
 		fmt.Println("NewNsqConsumer failed", e)
 		return
 	} else {
diff --git a/main.go b/main.go
index c01a2e1..4eec6c9 100644
--- a/main.go
+++ b/main.go
@@ -6,7 +6,7 @@
 
 import (
 	"nsqclient/TST/test"
-	"nsqclient/nsqcli"
+	"nsqclient/nsqclient"
 	"sync"
 	"time"
 	"unsafe"
@@ -14,18 +14,18 @@
 
 //export createProducer
 func createProducer(addr string) unsafe.Pointer {
-	n, _ := nsqcli.NewProducer(addr)
-	return nsqcli.Save(n)
+	n, _ := nsqclient.NewProducer(addr)
+	return nsqclient.Save(n)
 }
 
 //export destroyProducer
 func destroyProducer(ph unsafe.Pointer) {
-	nsqcli.Unref(ph)
-	nsqcli.DestroyProducerPool()
+	nsqclient.Unref(ph)
+	nsqclient.DestroyProducerPool()
 }
 
-func pcvt(ph unsafe.Pointer) nsqcli.Producer {
-	return nsqcli.Restore(ph).(nsqcli.Producer)
+func pcvt(ph unsafe.Pointer) nsqclient.Producer {
+	return nsqclient.Restore(ph).(nsqclient.Producer)
 }
 
 //export publish
@@ -58,30 +58,30 @@
 /////////////////////////////////////////////////////////////
 
 type consumer struct {
-	nsqcon *nsqcli.NsqConsumer
+	nsqcon *nsqclient.NsqConsumer
 	lck    sync.Mutex
 	msgs   [][]byte
 }
 
 //export createConsumer
 func createConsumer(topic, channel string) unsafe.Pointer {
-	if c, err := nsqcli.NewNsqConsumer(nil, topic, channel); err == nil {
+	if c, err := nsqclient.NewNsqConsumer(nil, topic, channel); err == nil {
 		con := &consumer{
 			nsqcon: c,
 		}
-		return nsqcli.Save(con)
+		return nsqclient.Save(con)
 	}
 	return nil
 }
 
 func ccvt(ch unsafe.Pointer) *consumer {
-	return nsqcli.Restore(ch).(*consumer)
+	return nsqclient.Restore(ch).(*consumer)
 }
 
 //export destroyConsumer
 func destroyConsumer(ch unsafe.Pointer) {
-	nsqcli.DestroyNsqConsumer(ccvt(ch).nsqcon)
-	nsqcli.Unref(ch)
+	nsqclient.DestroyNsqConsumer(ccvt(ch).nsqcon)
+	nsqclient.Unref(ch)
 }
 
 //export Run
diff --git a/nsqclient/LICENSE b/nsqclient/LICENSE
new file mode 100644
index 0000000..25fdaf6
--- /dev/null
+++ b/nsqclient/LICENSE
@@ -0,0 +1,20 @@
+The MIT License (MIT)
+
+Copyright (c) 2013 Fatih Arslan
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/nsqclient/README.md b/nsqclient/README.md
new file mode 100644
index 0000000..432d86d
--- /dev/null
+++ b/nsqclient/README.md
@@ -0,0 +1,54 @@
+# NSQPool
+
+NSQPool is a thread safe connection pool for nsq producer. It can be used to
+manage and reuse nsq producer connection.
+
+
+## Install and Usage
+
+Install the package with:
+
+```bash
+github.com/qgymje/nsqpool
+```
+
+Import it with:
+
+```go
+import (
+    "github.com/qgymje/nsqpool"
+    nsq "github.com/nsqio/go-nsq"
+)
+```
+
+and use `pool` as the package name inside the code.
+
+## Example
+
+```go
+// create a factory() to be used with channel based pool
+factory := func() (*nsq.Producer, error) { 
+    config := nsq.NewConfig()
+    return nsq.NewProducer(":4150", config)
+}
+
+nsqPool, err := pool.NewChannelPool(5, 30, factory)
+
+producer, err := nsqPool.Get()
+
+producer.Publish("topic", "some data")
+// do something with producer and put it back to the pool by closing the connection
+// (this doesn't close the underlying connection instead it's putting it back
+// to the pool).
+producer.Close()
+
+// close pool any time you want, this closes all the connections inside a pool
+nsqPool.Close()
+
+// currently available connections in the pool
+current := nsqPool.Len()
+```
+
+## License
+
+The MIT License (MIT) - see LICENSE for more details
diff --git a/nsqclient/channel.go b/nsqclient/channel.go
new file mode 100644
index 0000000..1594dcc
--- /dev/null
+++ b/nsqclient/channel.go
@@ -0,0 +1,134 @@
+package nsqclient
+
+import (
+	"errors"
+	"fmt"
+	"sync"
+
+	nsq "github.com/nsqio/go-nsq"
+)
+
+// channelPool implements the Pool interface based on buffered channels.
+type channelPool struct {
+	// storage for our net.Conn connections
+	mu    sync.Mutex
+	conns chan *nsq.Producer
+
+	// net.Conn generator
+	factory Factory
+}
+
+// Factory is a function to create new connections.
+type Factory func() (*nsq.Producer, error)
+
+// NewChannelPool returns a new pool based on buffered channels with an initial
+// capacity and maximum capacity. Factory is used when initial capacity is
+// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool
+// until a new Get() is called. During a Get(), If there is no new connection
+// available in the pool, a new connection will be created via the Factory()
+// method.
+func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) {
+	if initialCap < 0 || maxCap <= 0 || initialCap > maxCap {
+		return nil, errors.New("invalid capacity settings")
+	}
+
+	c := &channelPool{
+		conns:   make(chan *nsq.Producer, maxCap),
+		factory: factory,
+	}
+
+	// create initial connections, if something goes wrong,
+	// just close the pool error out.
+	for i := 0; i < initialCap; i++ {
+		conn, err := factory()
+		if err != nil {
+			c.Close()
+			return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
+		}
+		c.conns <- conn
+	}
+
+	return c, nil
+}
+
+func (c *channelPool) getConns() chan *nsq.Producer {
+	c.mu.Lock()
+	conns := c.conns
+	c.mu.Unlock()
+	return conns
+}
+
+// Get implements the Pool interfaces Get() method. If there is no new
+// connection available in the pool, a new connection will be created via the
+// Factory() method.
+func (c *channelPool) Get() (*PoolConn, error) {
+	conns := c.getConns()
+	if conns == nil {
+		return nil, ErrClosed
+	}
+
+	// wrap our connections with out custom net.Conn implementation (wrapConn
+	// method) that puts the connection back to the pool if it's closed.
+	select {
+	case conn := <-conns:
+		if conn == nil {
+			return nil, ErrClosed
+		}
+
+		return c.wrapConn(conn), nil
+	default:
+		conn, err := c.factory()
+		if err != nil {
+			return nil, err
+		}
+
+		return c.wrapConn(conn), nil
+	}
+}
+
+// put puts the connection back to the pool. If the pool is full or closed,
+// conn is simply closed. A nil conn will be rejected.
+func (c *channelPool) put(conn *nsq.Producer) error {
+	if conn == nil {
+		return errors.New("connection is nil. rejecting")
+	}
+
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if c.conns == nil {
+		// pool is closed, close passed connection
+		conn.Stop()
+		return nil
+	}
+
+	// put the resource back into the pool. If the pool is full, this will
+	// block and the default case will be executed.
+	select {
+	case c.conns <- conn:
+		return nil
+	default:
+		// pool is full, close passed connection
+		conn.Stop()
+		return nil
+	}
+}
+
+func (c *channelPool) Close() {
+	c.mu.Lock()
+	conns := c.conns
+	c.conns = nil
+	c.factory = nil
+	c.mu.Unlock()
+
+	if conns == nil {
+		return
+	}
+
+	close(conns)
+	for conn := range conns {
+		conn.Stop()
+	}
+}
+
+func (c *channelPool) Len() int { return len(c.getConns()) }
diff --git a/nsqclient/channel_test.go b/nsqclient/channel_test.go
new file mode 100644
index 0000000..725db9b
--- /dev/null
+++ b/nsqclient/channel_test.go
@@ -0,0 +1,218 @@
+package nsqclient
+
+import (
+	"math/rand"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/nsqio/go-nsq"
+)
+
+var (
+	InitialCap = 2
+	MaximumCap = 10
+	factory    = func() (*nsq.Producer, error) {
+		config := nsq.NewConfig()
+		return nsq.NewProducer(":4160", config)
+	}
+)
+
+func newChannelPool() (Pool, error) {
+	return NewChannelPool(InitialCap, MaximumCap, factory)
+}
+
+func TestNew(t *testing.T) {
+	_, err := newChannelPool()
+	if err != nil {
+		t.Errorf("New error: %s", err)
+	}
+}
+
+func TestPool_Get(t *testing.T) {
+	p, _ := newChannelPool()
+	defer p.Close()
+
+	_, err := p.Get()
+	if err != nil {
+		t.Errorf("Get error: %s", err)
+	}
+
+	// after one get, current capacity should be lowered by one.
+	if p.Len() != (InitialCap - 1) {
+		t.Errorf("Get error. Expecting %d, got %d",
+			(InitialCap - 1), p.Len())
+	}
+
+	// get them all
+	var wg sync.WaitGroup
+	for i := 0; i < (InitialCap - 1); i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			_, err := p.Get()
+			if err != nil {
+				t.Errorf("Get error: %s", err)
+			}
+		}()
+	}
+	wg.Wait()
+
+	if p.Len() != 0 {
+		t.Errorf("Get error. Expecting %d, got %d",
+			(InitialCap - 1), p.Len())
+	}
+
+	_, err = p.Get()
+	if err != nil {
+		t.Errorf("Get error: %s", err)
+	}
+}
+
+func TestPool_Put(t *testing.T) {
+	p, err := NewChannelPool(0, 30, factory)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer p.Close()
+
+	// get/create from the pool
+	conns := make([]*PoolConn, MaximumCap)
+	for i := 0; i < MaximumCap; i++ {
+		conn, _ := p.Get()
+		conns[i] = conn
+	}
+
+	// now put them all back
+	for _, conn := range conns {
+		conn.Close()
+	}
+
+	if p.Len() != MaximumCap {
+		t.Errorf("Put error len. Expecting %d, got %d",
+			1, p.Len())
+	}
+
+	conn, _ := p.Get()
+	p.Close() // close pool
+
+	conn.Close() // try to put into a full pool
+	if p.Len() != 0 {
+		t.Errorf("Put error. Closed pool shouldn't allow to put connections.")
+	}
+}
+
+func TestPool_PutUnusableConn(t *testing.T) {
+	p, _ := newChannelPool()
+	defer p.Close()
+
+	// ensure pool is not empty
+	conn, _ := p.Get()
+	conn.Close()
+
+	poolSize := p.Len()
+	conn, _ = p.Get()
+	conn.Close()
+	if p.Len() != poolSize {
+		t.Errorf("Pool size is expected to be equal to initial size")
+	}
+
+	conn, _ = p.Get()
+	conn.MarkUnusable()
+
+	conn.Close()
+	if p.Len() != poolSize-1 {
+		t.Errorf("Pool size is expected to be initial_size - 1 [%d:%d]", p.Len(), poolSize-1)
+	}
+}
+
+func TestPool_UsedCapacity(t *testing.T) {
+	p, _ := newChannelPool()
+	defer p.Close()
+
+	if p.Len() != InitialCap {
+		t.Errorf("InitialCap error. Expecting %d, got %d",
+			InitialCap, p.Len())
+	}
+}
+
+func TestPool_Close(t *testing.T) {
+	p, _ := newChannelPool()
+
+	// now close it and test all cases we are expecting.
+	p.Close()
+
+	c := p.(*channelPool)
+
+	if c.conns != nil {
+		t.Errorf("Close error, conns channel should be nil")
+	}
+
+	if c.factory != nil {
+		t.Errorf("Close error, factory should be nil")
+	}
+
+	_, err := p.Get()
+	if err == nil {
+		t.Errorf("Close error, get conn should return an error")
+	}
+
+	if p.Len() != 0 {
+		t.Errorf("Close error used capacity. Expecting 0, got %d", p.Len())
+	}
+}
+
+func TestPoolConcurrent(t *testing.T) {
+	p, _ := newChannelPool()
+	pipe := make(chan *PoolConn, 0)
+
+	go func() {
+		p.Close()
+	}()
+
+	for i := 0; i < MaximumCap; i++ {
+		go func() {
+			conn, _ := p.Get()
+
+			pipe <- conn
+		}()
+
+		go func() {
+			conn := <-pipe
+			if conn == nil {
+				return
+			}
+			conn.Close()
+		}()
+	}
+}
+
+func TestPoolConcurrent2(t *testing.T) {
+	p, _ := NewChannelPool(0, 30, factory)
+
+	var wg sync.WaitGroup
+
+	go func() {
+		for i := 0; i < 10; i++ {
+			wg.Add(1)
+			go func(i int) {
+				conn, _ := p.Get()
+				time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
+				conn.Close()
+				wg.Done()
+			}(i)
+		}
+	}()
+
+	for i := 0; i < 10; i++ {
+		wg.Add(1)
+		go func(i int) {
+			conn, _ := p.Get()
+			time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
+			conn.Close()
+			wg.Done()
+		}(i)
+	}
+
+	wg.Wait()
+}
diff --git a/nsqclient/conn.go b/nsqclient/conn.go
new file mode 100644
index 0000000..5c680b5
--- /dev/null
+++ b/nsqclient/conn.go
@@ -0,0 +1,45 @@
+package nsqclient
+
+import (
+	"sync"
+
+	nsq "github.com/nsqio/go-nsq"
+)
+
+// PoolConn is a wrapper around net.Conn to modify the the behavior of
+// net.Conn's Close() method.
+type PoolConn struct {
+	*nsq.Producer
+	mu       sync.RWMutex
+	c        *channelPool
+	unusable bool
+}
+
+// Close puts the given connects back to the pool instead of closing it.
+func (p *PoolConn) Close() error {
+	p.mu.RLock()
+	defer p.mu.RUnlock()
+
+	if p.unusable {
+		if p.Producer != nil {
+			p.Producer.Stop()
+			return nil
+		}
+		return nil
+	}
+	return p.c.put(p.Producer)
+}
+
+// MarkUnusable marks the connection not usable any more, to let the pool close it instead of returning it to pool.
+func (p *PoolConn) MarkUnusable() {
+	p.mu.Lock()
+	p.unusable = true
+	p.mu.Unlock()
+}
+
+// newConn wraps a standard net.Conn to a poolConn net.Conn.
+func (c *channelPool) wrapConn(conn *nsq.Producer) *PoolConn {
+	p := &PoolConn{c: c}
+	p.Producer = conn
+	return p
+}
diff --git a/nsqclient/consumer.go b/nsqclient/consumer.go
new file mode 100644
index 0000000..a0df0b0
--- /dev/null
+++ b/nsqclient/consumer.go
@@ -0,0 +1,99 @@
+package nsqclient
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	nsq "github.com/nsqio/go-nsq"
+)
+
+type NsqConsumer struct {
+	consumer *nsq.Consumer
+	// handler   nsq.Handler
+	handler   func([]byte) error
+	ctx       context.Context
+	ctxCancel context.CancelFunc
+	topic     string
+	channel   string
+}
+
+func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
+	conf := nsq.NewConfig()
+	conf.MaxAttempts = 0
+	conf.MsgTimeout = 10 * time.Minute         // 榛樿涓�涓秷鎭渶澶氳兘澶勭悊鍗佸垎閽燂紝鍚﹀垯灏变細閲嶆柊涓㈠叆闃熷垪
+	conf.LookupdPollInterval = 3 * time.Second // 璋冩暣consumer鐨勯噸杩為棿闅旀椂闂翠负3绉�
+	for _, option := range options {
+		option(conf)
+	}
+
+	consumer, err := nsq.NewConsumer(topic, channel, conf)
+	if err != nil {
+		return nil, err
+	}
+	return &NsqConsumer{
+		consumer: consumer,
+		ctx:      ctx,
+		topic:    topic,
+		channel:  channel,
+	}, nil
+}
+
+func DestroyNsqConsumer(c *NsqConsumer) {
+	if c != nil {
+		if c.ctxCancel != nil {
+			c.ctxCancel()
+		}
+	}
+}
+
+// func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
+// 	n.handler = handler
+// }
+
+func (n *NsqConsumer) AddHandler(handler func([]byte) error) {
+	n.handler = handler
+}
+
+func (n *NsqConsumer) Run(qaddr string, concurrency int) error {
+	return n.RunDistributed([]string{qaddr}, nil, concurrency)
+}
+
+func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error {
+	return n.RunDistributed(nil, []string{lookupAddr}, concurrency)
+}
+
+func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error {
+	n.consumer.ChangeMaxInFlight(concurrency)
+	// n.consumer.AddConcurrentHandlers(n.handler, concurrency)
+	n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error {
+		return n.handler(msg.Body)
+		// return nil
+	}), concurrency)
+
+	var err error
+	if len(qAddr) > 0 {
+		err = n.consumer.ConnectToNSQDs(qAddr)
+	} else if len(lAddr) > 0 {
+		err = n.consumer.ConnectToNSQLookupds(lAddr)
+	} else {
+		err = fmt.Errorf("Addr Must NOT Empty")
+	}
+	if err != nil {
+		return err
+	}
+
+	if n.ctx == nil {
+		n.ctx, n.ctxCancel = context.WithCancel(context.Background())
+	}
+
+	for {
+		select {
+		case <-n.ctx.Done():
+			fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel)
+			n.consumer.Stop()
+			fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel)
+			return nil
+		}
+	}
+}
diff --git a/nsqclient/pointer.go b/nsqclient/pointer.go
new file mode 100644
index 0000000..1cba795
--- /dev/null
+++ b/nsqclient/pointer.go
@@ -0,0 +1,57 @@
+package nsqclient
+
+// #include <stdlib.h>
+import "C"
+import (
+	"sync"
+	"unsafe"
+)
+
+var (
+	mutex sync.RWMutex
+	store = map[unsafe.Pointer]interface{}{}
+)
+
+func Save(v interface{}) unsafe.Pointer {
+	if v == nil {
+		return nil
+	}
+
+	// Generate real fake C pointer.
+	// This pointer will not store any data, but will bi used for indexing purposes.
+	// Since Go doest allow to cast dangling pointer to unsafe.Pointer, we do rally allocate one byte.
+	// Why we need indexing, because Go doest allow C code to store pointers to Go data.
+	var ptr unsafe.Pointer = C.malloc(C.size_t(1))
+	if ptr == nil {
+		panic("can't allocate 'cgo-pointer hack index pointer': ptr == nil")
+	}
+
+	mutex.Lock()
+	store[ptr] = v
+	mutex.Unlock()
+
+	return ptr
+}
+
+func Restore(ptr unsafe.Pointer) (v interface{}) {
+	if ptr == nil {
+		return nil
+	}
+
+	mutex.RLock()
+	v = store[ptr]
+	mutex.RUnlock()
+	return
+}
+
+func Unref(ptr unsafe.Pointer) {
+	if ptr == nil {
+		return
+	}
+
+	mutex.Lock()
+	delete(store, ptr)
+	mutex.Unlock()
+
+	C.free(ptr)
+}
diff --git a/nsqclient/pool.go b/nsqclient/pool.go
new file mode 100644
index 0000000..b773490
--- /dev/null
+++ b/nsqclient/pool.go
@@ -0,0 +1,25 @@
+// Package pool implements a pool of net.Conn interfaces to manage and reuse them.
+package nsqclient
+
+import "errors"
+
+var (
+	// ErrClosed is the error resulting if the pool is closed via pool.Close().
+	ErrClosed = errors.New("pool is closed")
+)
+
+// Pool interface describes a pool implementation. A pool should have maximum
+// capacity. An ideal pool is threadsafe and easy to use.
+type Pool interface {
+	// Get returns a new connection from the pool. Closing the connections puts
+	// it back to the Pool. Closing it when the pool is destroyed or full will
+	// be counted as an error.
+	Get() (*PoolConn, error)
+
+	// Close closes the pool and all its connections. After Close() the pool is
+	// no longer usable.
+	Close()
+
+	// Len returns the current number of connections of the pool.
+	Len() int
+}
diff --git a/nsqclient/producer.go b/nsqclient/producer.go
new file mode 100644
index 0000000..717c7a1
--- /dev/null
+++ b/nsqclient/producer.go
@@ -0,0 +1,139 @@
+package nsqclient
+
+import (
+	"fmt"
+	"time"
+
+	nsq "github.com/nsqio/go-nsq"
+)
+
+type Producer interface {
+	Publish(topic string, body []byte) error
+	MultiPublish(topic string, body [][]byte) error
+	DeferredPublish(topic string, delay time.Duration, body []byte) error
+}
+
+var _ Producer = (*producer)(nil)
+
+type producer struct {
+	pool Pool
+}
+
+var (
+	// 				   name   pool producer
+	nsqList = make(map[string]Pool)
+)
+
+type Config struct {
+	Addr     string `toml:"addr" json:"addr"`
+	InitSize int    `toml:"init_size" json:"init_size"`
+	MaxSize  int    `toml:"max_size" json:"max_size"`
+}
+
+func CreateProducerPool(configs map[string]Config) {
+	for name, conf := range configs {
+		n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize)
+		if err == nil {
+			nsqList[name] = n
+			// 鏀寔ip:port瀵诲潃
+			nsqList[conf.Addr] = n
+		}
+	}
+}
+
+func DestroyProducerPool() {
+	for _, p := range nsqList {
+		p.Close()
+	}
+}
+
+func GetProducer(key ...string) (*producer, error) {
+	k := "default"
+	if len(key) > 0 {
+		k = key[0]
+	}
+	if n, ok := nsqList[k]; ok {
+		return &producer{n}, nil
+	}
+	return nil, fmt.Errorf("GetProducer can't get producer")
+}
+
+// CreateNSQProducer create nsq producer
+func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) {
+	cfg := nsq.NewConfig()
+	for _, option := range options {
+		option(cfg)
+	}
+
+	producer, err := nsq.NewProducer(addr, cfg)
+	if err != nil {
+		return nil, err
+	}
+	// producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError)
+	return producer, nil
+}
+
+// CreateNSQProducerPool create a nwq producer pool
+func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) {
+	factory := func() (*nsq.Producer, error) {
+		// TODO 杩欓噷搴旇鎵цping鏂规硶鏉ョ‘瀹氳繛鎺ユ槸姝e父鐨勫惁鍒欎笉搴旇鍒涘缓conn
+		return newProducer(addr, options...)
+	}
+	nsqPool, err := NewChannelPool(initSize, maxSize, factory)
+	if err != nil {
+		return nil, err
+	}
+	return nsqPool, nil
+}
+
+func NewProducer(addr string) (*producer, error) {
+	CreateProducerPool(map[string]Config{"default": {addr, 1, 1}})
+	return GetProducer()
+}
+
+func retry(num int, fn func() error) error {
+	var err error
+	for i := 0; i < num; i++ {
+		err = fn()
+		if err == nil {
+			break
+		}
+	}
+	return err
+}
+
+func (p *producer) Publish(topic string, body []byte) error {
+	nsq, err := p.pool.Get()
+	if err != nil {
+		return err
+	}
+	defer nsq.Close()
+
+	return retry(2, func() error {
+		return nsq.Publish(topic, body)
+	})
+}
+
+func (p *producer) MultiPublish(topic string, body [][]byte) error {
+	nsq, err := p.pool.Get()
+	if err != nil {
+		return err
+	}
+	defer nsq.Close()
+
+	return retry(2, func() error {
+		return nsq.MultiPublish(topic, body)
+	})
+}
+
+func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error {
+	nsq, err := p.pool.Get()
+	if err != nil {
+		return err
+	}
+	defer nsq.Close()
+
+	return retry(2, func() error {
+		return nsq.DeferredPublish(topic, delay, body)
+	})
+}

--
Gitblit v1.8.0