From 1b4fc8c66026c77abf734126b4e7662e386ec0f5 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 16 五月 2023 16:50:46 +0800
Subject: [PATCH] for go get
---
nsqclient/consumer.go | 99 +++++++
nsqclient/channel_test.go | 218 +++++++++++++++
.gitignore | 2
nsqclient/pointer.go | 57 ++++
nsqclient/producer.go | 139 +++++++++
nsqclient/README.md | 54 +++
nsqclient/pool.go | 25 +
TST/test/test.go | 8
main.go | 26
nsqclient/LICENSE | 20 +
nsqclient/conn.go | 45 +++
nsqclient/channel.go | 134 +++++++++
12 files changed, 810 insertions(+), 17 deletions(-)
diff --git a/.gitignore b/.gitignore
index c41f8dc..129af67 100644
--- a/.gitignore
+++ b/.gitignore
@@ -24,3 +24,5 @@
nsqCli
cnsqcli
+go.sum
+go.mod
diff --git a/TST/test/test.go b/TST/test/test.go
index 5aaf283..2e1a679 100644
--- a/TST/test/test.go
+++ b/TST/test/test.go
@@ -4,16 +4,16 @@
"context"
"fmt"
"log"
- "nsqclient/nsqcli"
+ "nsqclient/nsqclient"
"time"
)
func produce(two bool) {
- p, _ := nsqcli.NewProducer("192.168.20.108:4150")
+ p, _ := nsqclient.NewProducer("192.168.20.108:4150")
var str string
for len(str) < 32 {
- str += "cnsqcli dynamic library"
+ str += "cnsqclient dynamic library"
}
msgx := []byte(str + "--x")
msgy := []byte(str + "--y")
@@ -40,7 +40,7 @@
func consume(topic, channel string) {
ctx, cancel := context.WithCancel(context.Background())
- if c, e := nsqcli.NewNsqConsumer(ctx, topic, channel); e != nil {
+ if c, e := nsqclient.NewNsqConsumer(ctx, topic, channel); e != nil {
fmt.Println("NewNsqConsumer failed", e)
return
} else {
diff --git a/main.go b/main.go
index c01a2e1..4eec6c9 100644
--- a/main.go
+++ b/main.go
@@ -6,7 +6,7 @@
import (
"nsqclient/TST/test"
- "nsqclient/nsqcli"
+ "nsqclient/nsqclient"
"sync"
"time"
"unsafe"
@@ -14,18 +14,18 @@
//export createProducer
func createProducer(addr string) unsafe.Pointer {
- n, _ := nsqcli.NewProducer(addr)
- return nsqcli.Save(n)
+ n, _ := nsqclient.NewProducer(addr)
+ return nsqclient.Save(n)
}
//export destroyProducer
func destroyProducer(ph unsafe.Pointer) {
- nsqcli.Unref(ph)
- nsqcli.DestroyProducerPool()
+ nsqclient.Unref(ph)
+ nsqclient.DestroyProducerPool()
}
-func pcvt(ph unsafe.Pointer) nsqcli.Producer {
- return nsqcli.Restore(ph).(nsqcli.Producer)
+func pcvt(ph unsafe.Pointer) nsqclient.Producer {
+ return nsqclient.Restore(ph).(nsqclient.Producer)
}
//export publish
@@ -58,30 +58,30 @@
/////////////////////////////////////////////////////////////
type consumer struct {
- nsqcon *nsqcli.NsqConsumer
+ nsqcon *nsqclient.NsqConsumer
lck sync.Mutex
msgs [][]byte
}
//export createConsumer
func createConsumer(topic, channel string) unsafe.Pointer {
- if c, err := nsqcli.NewNsqConsumer(nil, topic, channel); err == nil {
+ if c, err := nsqclient.NewNsqConsumer(nil, topic, channel); err == nil {
con := &consumer{
nsqcon: c,
}
- return nsqcli.Save(con)
+ return nsqclient.Save(con)
}
return nil
}
func ccvt(ch unsafe.Pointer) *consumer {
- return nsqcli.Restore(ch).(*consumer)
+ return nsqclient.Restore(ch).(*consumer)
}
//export destroyConsumer
func destroyConsumer(ch unsafe.Pointer) {
- nsqcli.DestroyNsqConsumer(ccvt(ch).nsqcon)
- nsqcli.Unref(ch)
+ nsqclient.DestroyNsqConsumer(ccvt(ch).nsqcon)
+ nsqclient.Unref(ch)
}
//export Run
diff --git a/nsqclient/LICENSE b/nsqclient/LICENSE
new file mode 100644
index 0000000..25fdaf6
--- /dev/null
+++ b/nsqclient/LICENSE
@@ -0,0 +1,20 @@
+The MIT License (MIT)
+
+Copyright (c) 2013 Fatih Arslan
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/nsqclient/README.md b/nsqclient/README.md
new file mode 100644
index 0000000..432d86d
--- /dev/null
+++ b/nsqclient/README.md
@@ -0,0 +1,54 @@
+# NSQPool
+
+NSQPool is a thread safe connection pool for nsq producer. It can be used to
+manage and reuse nsq producer connection.
+
+
+## Install and Usage
+
+Install the package with:
+
+```bash
+github.com/qgymje/nsqpool
+```
+
+Import it with:
+
+```go
+import (
+ "github.com/qgymje/nsqpool"
+ nsq "github.com/nsqio/go-nsq"
+)
+```
+
+and use `pool` as the package name inside the code.
+
+## Example
+
+```go
+// create a factory() to be used with channel based pool
+factory := func() (*nsq.Producer, error) {
+ config := nsq.NewConfig()
+ return nsq.NewProducer(":4150", config)
+}
+
+nsqPool, err := pool.NewChannelPool(5, 30, factory)
+
+producer, err := nsqPool.Get()
+
+producer.Publish("topic", "some data")
+// do something with producer and put it back to the pool by closing the connection
+// (this doesn't close the underlying connection instead it's putting it back
+// to the pool).
+producer.Close()
+
+// close pool any time you want, this closes all the connections inside a pool
+nsqPool.Close()
+
+// currently available connections in the pool
+current := nsqPool.Len()
+```
+
+## License
+
+The MIT License (MIT) - see LICENSE for more details
diff --git a/nsqclient/channel.go b/nsqclient/channel.go
new file mode 100644
index 0000000..1594dcc
--- /dev/null
+++ b/nsqclient/channel.go
@@ -0,0 +1,134 @@
+package nsqclient
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+
+ nsq "github.com/nsqio/go-nsq"
+)
+
+// channelPool implements the Pool interface based on buffered channels.
+type channelPool struct {
+ // storage for our net.Conn connections
+ mu sync.Mutex
+ conns chan *nsq.Producer
+
+ // net.Conn generator
+ factory Factory
+}
+
+// Factory is a function to create new connections.
+type Factory func() (*nsq.Producer, error)
+
+// NewChannelPool returns a new pool based on buffered channels with an initial
+// capacity and maximum capacity. Factory is used when initial capacity is
+// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool
+// until a new Get() is called. During a Get(), If there is no new connection
+// available in the pool, a new connection will be created via the Factory()
+// method.
+func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) {
+ if initialCap < 0 || maxCap <= 0 || initialCap > maxCap {
+ return nil, errors.New("invalid capacity settings")
+ }
+
+ c := &channelPool{
+ conns: make(chan *nsq.Producer, maxCap),
+ factory: factory,
+ }
+
+ // create initial connections, if something goes wrong,
+ // just close the pool error out.
+ for i := 0; i < initialCap; i++ {
+ conn, err := factory()
+ if err != nil {
+ c.Close()
+ return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
+ }
+ c.conns <- conn
+ }
+
+ return c, nil
+}
+
+func (c *channelPool) getConns() chan *nsq.Producer {
+ c.mu.Lock()
+ conns := c.conns
+ c.mu.Unlock()
+ return conns
+}
+
+// Get implements the Pool interfaces Get() method. If there is no new
+// connection available in the pool, a new connection will be created via the
+// Factory() method.
+func (c *channelPool) Get() (*PoolConn, error) {
+ conns := c.getConns()
+ if conns == nil {
+ return nil, ErrClosed
+ }
+
+ // wrap our connections with out custom net.Conn implementation (wrapConn
+ // method) that puts the connection back to the pool if it's closed.
+ select {
+ case conn := <-conns:
+ if conn == nil {
+ return nil, ErrClosed
+ }
+
+ return c.wrapConn(conn), nil
+ default:
+ conn, err := c.factory()
+ if err != nil {
+ return nil, err
+ }
+
+ return c.wrapConn(conn), nil
+ }
+}
+
+// put puts the connection back to the pool. If the pool is full or closed,
+// conn is simply closed. A nil conn will be rejected.
+func (c *channelPool) put(conn *nsq.Producer) error {
+ if conn == nil {
+ return errors.New("connection is nil. rejecting")
+ }
+
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if c.conns == nil {
+ // pool is closed, close passed connection
+ conn.Stop()
+ return nil
+ }
+
+ // put the resource back into the pool. If the pool is full, this will
+ // block and the default case will be executed.
+ select {
+ case c.conns <- conn:
+ return nil
+ default:
+ // pool is full, close passed connection
+ conn.Stop()
+ return nil
+ }
+}
+
+func (c *channelPool) Close() {
+ c.mu.Lock()
+ conns := c.conns
+ c.conns = nil
+ c.factory = nil
+ c.mu.Unlock()
+
+ if conns == nil {
+ return
+ }
+
+ close(conns)
+ for conn := range conns {
+ conn.Stop()
+ }
+}
+
+func (c *channelPool) Len() int { return len(c.getConns()) }
diff --git a/nsqclient/channel_test.go b/nsqclient/channel_test.go
new file mode 100644
index 0000000..725db9b
--- /dev/null
+++ b/nsqclient/channel_test.go
@@ -0,0 +1,218 @@
+package nsqclient
+
+import (
+ "math/rand"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/nsqio/go-nsq"
+)
+
+var (
+ InitialCap = 2
+ MaximumCap = 10
+ factory = func() (*nsq.Producer, error) {
+ config := nsq.NewConfig()
+ return nsq.NewProducer(":4160", config)
+ }
+)
+
+func newChannelPool() (Pool, error) {
+ return NewChannelPool(InitialCap, MaximumCap, factory)
+}
+
+func TestNew(t *testing.T) {
+ _, err := newChannelPool()
+ if err != nil {
+ t.Errorf("New error: %s", err)
+ }
+}
+
+func TestPool_Get(t *testing.T) {
+ p, _ := newChannelPool()
+ defer p.Close()
+
+ _, err := p.Get()
+ if err != nil {
+ t.Errorf("Get error: %s", err)
+ }
+
+ // after one get, current capacity should be lowered by one.
+ if p.Len() != (InitialCap - 1) {
+ t.Errorf("Get error. Expecting %d, got %d",
+ (InitialCap - 1), p.Len())
+ }
+
+ // get them all
+ var wg sync.WaitGroup
+ for i := 0; i < (InitialCap - 1); i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ _, err := p.Get()
+ if err != nil {
+ t.Errorf("Get error: %s", err)
+ }
+ }()
+ }
+ wg.Wait()
+
+ if p.Len() != 0 {
+ t.Errorf("Get error. Expecting %d, got %d",
+ (InitialCap - 1), p.Len())
+ }
+
+ _, err = p.Get()
+ if err != nil {
+ t.Errorf("Get error: %s", err)
+ }
+}
+
+func TestPool_Put(t *testing.T) {
+ p, err := NewChannelPool(0, 30, factory)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer p.Close()
+
+ // get/create from the pool
+ conns := make([]*PoolConn, MaximumCap)
+ for i := 0; i < MaximumCap; i++ {
+ conn, _ := p.Get()
+ conns[i] = conn
+ }
+
+ // now put them all back
+ for _, conn := range conns {
+ conn.Close()
+ }
+
+ if p.Len() != MaximumCap {
+ t.Errorf("Put error len. Expecting %d, got %d",
+ 1, p.Len())
+ }
+
+ conn, _ := p.Get()
+ p.Close() // close pool
+
+ conn.Close() // try to put into a full pool
+ if p.Len() != 0 {
+ t.Errorf("Put error. Closed pool shouldn't allow to put connections.")
+ }
+}
+
+func TestPool_PutUnusableConn(t *testing.T) {
+ p, _ := newChannelPool()
+ defer p.Close()
+
+ // ensure pool is not empty
+ conn, _ := p.Get()
+ conn.Close()
+
+ poolSize := p.Len()
+ conn, _ = p.Get()
+ conn.Close()
+ if p.Len() != poolSize {
+ t.Errorf("Pool size is expected to be equal to initial size")
+ }
+
+ conn, _ = p.Get()
+ conn.MarkUnusable()
+
+ conn.Close()
+ if p.Len() != poolSize-1 {
+ t.Errorf("Pool size is expected to be initial_size - 1 [%d:%d]", p.Len(), poolSize-1)
+ }
+}
+
+func TestPool_UsedCapacity(t *testing.T) {
+ p, _ := newChannelPool()
+ defer p.Close()
+
+ if p.Len() != InitialCap {
+ t.Errorf("InitialCap error. Expecting %d, got %d",
+ InitialCap, p.Len())
+ }
+}
+
+func TestPool_Close(t *testing.T) {
+ p, _ := newChannelPool()
+
+ // now close it and test all cases we are expecting.
+ p.Close()
+
+ c := p.(*channelPool)
+
+ if c.conns != nil {
+ t.Errorf("Close error, conns channel should be nil")
+ }
+
+ if c.factory != nil {
+ t.Errorf("Close error, factory should be nil")
+ }
+
+ _, err := p.Get()
+ if err == nil {
+ t.Errorf("Close error, get conn should return an error")
+ }
+
+ if p.Len() != 0 {
+ t.Errorf("Close error used capacity. Expecting 0, got %d", p.Len())
+ }
+}
+
+func TestPoolConcurrent(t *testing.T) {
+ p, _ := newChannelPool()
+ pipe := make(chan *PoolConn, 0)
+
+ go func() {
+ p.Close()
+ }()
+
+ for i := 0; i < MaximumCap; i++ {
+ go func() {
+ conn, _ := p.Get()
+
+ pipe <- conn
+ }()
+
+ go func() {
+ conn := <-pipe
+ if conn == nil {
+ return
+ }
+ conn.Close()
+ }()
+ }
+}
+
+func TestPoolConcurrent2(t *testing.T) {
+ p, _ := NewChannelPool(0, 30, factory)
+
+ var wg sync.WaitGroup
+
+ go func() {
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ go func(i int) {
+ conn, _ := p.Get()
+ time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
+ conn.Close()
+ wg.Done()
+ }(i)
+ }
+ }()
+
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ go func(i int) {
+ conn, _ := p.Get()
+ time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
+ conn.Close()
+ wg.Done()
+ }(i)
+ }
+
+ wg.Wait()
+}
diff --git a/nsqclient/conn.go b/nsqclient/conn.go
new file mode 100644
index 0000000..5c680b5
--- /dev/null
+++ b/nsqclient/conn.go
@@ -0,0 +1,45 @@
+package nsqclient
+
+import (
+ "sync"
+
+ nsq "github.com/nsqio/go-nsq"
+)
+
+// PoolConn is a wrapper around net.Conn to modify the the behavior of
+// net.Conn's Close() method.
+type PoolConn struct {
+ *nsq.Producer
+ mu sync.RWMutex
+ c *channelPool
+ unusable bool
+}
+
+// Close puts the given connects back to the pool instead of closing it.
+func (p *PoolConn) Close() error {
+ p.mu.RLock()
+ defer p.mu.RUnlock()
+
+ if p.unusable {
+ if p.Producer != nil {
+ p.Producer.Stop()
+ return nil
+ }
+ return nil
+ }
+ return p.c.put(p.Producer)
+}
+
+// MarkUnusable marks the connection not usable any more, to let the pool close it instead of returning it to pool.
+func (p *PoolConn) MarkUnusable() {
+ p.mu.Lock()
+ p.unusable = true
+ p.mu.Unlock()
+}
+
+// newConn wraps a standard net.Conn to a poolConn net.Conn.
+func (c *channelPool) wrapConn(conn *nsq.Producer) *PoolConn {
+ p := &PoolConn{c: c}
+ p.Producer = conn
+ return p
+}
diff --git a/nsqclient/consumer.go b/nsqclient/consumer.go
new file mode 100644
index 0000000..a0df0b0
--- /dev/null
+++ b/nsqclient/consumer.go
@@ -0,0 +1,99 @@
+package nsqclient
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ nsq "github.com/nsqio/go-nsq"
+)
+
+type NsqConsumer struct {
+ consumer *nsq.Consumer
+ // handler nsq.Handler
+ handler func([]byte) error
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ topic string
+ channel string
+}
+
+func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
+ conf := nsq.NewConfig()
+ conf.MaxAttempts = 0
+ conf.MsgTimeout = 10 * time.Minute // 榛樿涓�涓秷鎭渶澶氳兘澶勭悊鍗佸垎閽燂紝鍚﹀垯灏变細閲嶆柊涓㈠叆闃熷垪
+ conf.LookupdPollInterval = 3 * time.Second // 璋冩暣consumer鐨勯噸杩為棿闅旀椂闂翠负3绉�
+ for _, option := range options {
+ option(conf)
+ }
+
+ consumer, err := nsq.NewConsumer(topic, channel, conf)
+ if err != nil {
+ return nil, err
+ }
+ return &NsqConsumer{
+ consumer: consumer,
+ ctx: ctx,
+ topic: topic,
+ channel: channel,
+ }, nil
+}
+
+func DestroyNsqConsumer(c *NsqConsumer) {
+ if c != nil {
+ if c.ctxCancel != nil {
+ c.ctxCancel()
+ }
+ }
+}
+
+// func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
+// n.handler = handler
+// }
+
+func (n *NsqConsumer) AddHandler(handler func([]byte) error) {
+ n.handler = handler
+}
+
+func (n *NsqConsumer) Run(qaddr string, concurrency int) error {
+ return n.RunDistributed([]string{qaddr}, nil, concurrency)
+}
+
+func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error {
+ return n.RunDistributed(nil, []string{lookupAddr}, concurrency)
+}
+
+func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error {
+ n.consumer.ChangeMaxInFlight(concurrency)
+ // n.consumer.AddConcurrentHandlers(n.handler, concurrency)
+ n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error {
+ return n.handler(msg.Body)
+ // return nil
+ }), concurrency)
+
+ var err error
+ if len(qAddr) > 0 {
+ err = n.consumer.ConnectToNSQDs(qAddr)
+ } else if len(lAddr) > 0 {
+ err = n.consumer.ConnectToNSQLookupds(lAddr)
+ } else {
+ err = fmt.Errorf("Addr Must NOT Empty")
+ }
+ if err != nil {
+ return err
+ }
+
+ if n.ctx == nil {
+ n.ctx, n.ctxCancel = context.WithCancel(context.Background())
+ }
+
+ for {
+ select {
+ case <-n.ctx.Done():
+ fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel)
+ n.consumer.Stop()
+ fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel)
+ return nil
+ }
+ }
+}
diff --git a/nsqclient/pointer.go b/nsqclient/pointer.go
new file mode 100644
index 0000000..1cba795
--- /dev/null
+++ b/nsqclient/pointer.go
@@ -0,0 +1,57 @@
+package nsqclient
+
+// #include <stdlib.h>
+import "C"
+import (
+ "sync"
+ "unsafe"
+)
+
+var (
+ mutex sync.RWMutex
+ store = map[unsafe.Pointer]interface{}{}
+)
+
+func Save(v interface{}) unsafe.Pointer {
+ if v == nil {
+ return nil
+ }
+
+ // Generate real fake C pointer.
+ // This pointer will not store any data, but will bi used for indexing purposes.
+ // Since Go doest allow to cast dangling pointer to unsafe.Pointer, we do rally allocate one byte.
+ // Why we need indexing, because Go doest allow C code to store pointers to Go data.
+ var ptr unsafe.Pointer = C.malloc(C.size_t(1))
+ if ptr == nil {
+ panic("can't allocate 'cgo-pointer hack index pointer': ptr == nil")
+ }
+
+ mutex.Lock()
+ store[ptr] = v
+ mutex.Unlock()
+
+ return ptr
+}
+
+func Restore(ptr unsafe.Pointer) (v interface{}) {
+ if ptr == nil {
+ return nil
+ }
+
+ mutex.RLock()
+ v = store[ptr]
+ mutex.RUnlock()
+ return
+}
+
+func Unref(ptr unsafe.Pointer) {
+ if ptr == nil {
+ return
+ }
+
+ mutex.Lock()
+ delete(store, ptr)
+ mutex.Unlock()
+
+ C.free(ptr)
+}
diff --git a/nsqclient/pool.go b/nsqclient/pool.go
new file mode 100644
index 0000000..b773490
--- /dev/null
+++ b/nsqclient/pool.go
@@ -0,0 +1,25 @@
+// Package pool implements a pool of net.Conn interfaces to manage and reuse them.
+package nsqclient
+
+import "errors"
+
+var (
+ // ErrClosed is the error resulting if the pool is closed via pool.Close().
+ ErrClosed = errors.New("pool is closed")
+)
+
+// Pool interface describes a pool implementation. A pool should have maximum
+// capacity. An ideal pool is threadsafe and easy to use.
+type Pool interface {
+ // Get returns a new connection from the pool. Closing the connections puts
+ // it back to the Pool. Closing it when the pool is destroyed or full will
+ // be counted as an error.
+ Get() (*PoolConn, error)
+
+ // Close closes the pool and all its connections. After Close() the pool is
+ // no longer usable.
+ Close()
+
+ // Len returns the current number of connections of the pool.
+ Len() int
+}
diff --git a/nsqclient/producer.go b/nsqclient/producer.go
new file mode 100644
index 0000000..717c7a1
--- /dev/null
+++ b/nsqclient/producer.go
@@ -0,0 +1,139 @@
+package nsqclient
+
+import (
+ "fmt"
+ "time"
+
+ nsq "github.com/nsqio/go-nsq"
+)
+
+type Producer interface {
+ Publish(topic string, body []byte) error
+ MultiPublish(topic string, body [][]byte) error
+ DeferredPublish(topic string, delay time.Duration, body []byte) error
+}
+
+var _ Producer = (*producer)(nil)
+
+type producer struct {
+ pool Pool
+}
+
+var (
+ // name pool producer
+ nsqList = make(map[string]Pool)
+)
+
+type Config struct {
+ Addr string `toml:"addr" json:"addr"`
+ InitSize int `toml:"init_size" json:"init_size"`
+ MaxSize int `toml:"max_size" json:"max_size"`
+}
+
+func CreateProducerPool(configs map[string]Config) {
+ for name, conf := range configs {
+ n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize)
+ if err == nil {
+ nsqList[name] = n
+ // 鏀寔ip:port瀵诲潃
+ nsqList[conf.Addr] = n
+ }
+ }
+}
+
+func DestroyProducerPool() {
+ for _, p := range nsqList {
+ p.Close()
+ }
+}
+
+func GetProducer(key ...string) (*producer, error) {
+ k := "default"
+ if len(key) > 0 {
+ k = key[0]
+ }
+ if n, ok := nsqList[k]; ok {
+ return &producer{n}, nil
+ }
+ return nil, fmt.Errorf("GetProducer can't get producer")
+}
+
+// CreateNSQProducer create nsq producer
+func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) {
+ cfg := nsq.NewConfig()
+ for _, option := range options {
+ option(cfg)
+ }
+
+ producer, err := nsq.NewProducer(addr, cfg)
+ if err != nil {
+ return nil, err
+ }
+ // producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError)
+ return producer, nil
+}
+
+// CreateNSQProducerPool create a nwq producer pool
+func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) {
+ factory := func() (*nsq.Producer, error) {
+ // TODO 杩欓噷搴旇鎵цping鏂规硶鏉ョ‘瀹氳繛鎺ユ槸姝e父鐨勫惁鍒欎笉搴旇鍒涘缓conn
+ return newProducer(addr, options...)
+ }
+ nsqPool, err := NewChannelPool(initSize, maxSize, factory)
+ if err != nil {
+ return nil, err
+ }
+ return nsqPool, nil
+}
+
+func NewProducer(addr string) (*producer, error) {
+ CreateProducerPool(map[string]Config{"default": {addr, 1, 1}})
+ return GetProducer()
+}
+
+func retry(num int, fn func() error) error {
+ var err error
+ for i := 0; i < num; i++ {
+ err = fn()
+ if err == nil {
+ break
+ }
+ }
+ return err
+}
+
+func (p *producer) Publish(topic string, body []byte) error {
+ nsq, err := p.pool.Get()
+ if err != nil {
+ return err
+ }
+ defer nsq.Close()
+
+ return retry(2, func() error {
+ return nsq.Publish(topic, body)
+ })
+}
+
+func (p *producer) MultiPublish(topic string, body [][]byte) error {
+ nsq, err := p.pool.Get()
+ if err != nil {
+ return err
+ }
+ defer nsq.Close()
+
+ return retry(2, func() error {
+ return nsq.MultiPublish(topic, body)
+ })
+}
+
+func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error {
+ nsq, err := p.pool.Get()
+ if err != nil {
+ return err
+ }
+ defer nsq.Close()
+
+ return retry(2, func() error {
+ return nsq.DeferredPublish(topic, delay, body)
+ })
+}
--
Gitblit v1.8.0