| | |
| | | |
| | | nsqCli |
| | | cnsqcli |
| | | go.sum |
| | | go.mod |
| | |
| | | "context" |
| | | "fmt" |
| | | "log" |
| | | "nsqclient/nsqcli" |
| | | "nsqclient/nsqclient" |
| | | "time" |
| | | ) |
| | | |
| | | func produce(two bool) { |
| | | p, _ := nsqcli.NewProducer("192.168.20.108:4150") |
| | | p, _ := nsqclient.NewProducer("192.168.20.108:4150") |
| | | |
| | | var str string |
| | | for len(str) < 32 { |
| | | str += "cnsqcli dynamic library" |
| | | str += "cnsqclient dynamic library" |
| | | } |
| | | msgx := []byte(str + "--x") |
| | | msgy := []byte(str + "--y") |
| | |
| | | func consume(topic, channel string) { |
| | | ctx, cancel := context.WithCancel(context.Background()) |
| | | |
| | | if c, e := nsqcli.NewNsqConsumer(ctx, topic, channel); e != nil { |
| | | if c, e := nsqclient.NewNsqConsumer(ctx, topic, channel); e != nil { |
| | | fmt.Println("NewNsqConsumer failed", e) |
| | | return |
| | | } else { |
| | |
| | | |
| | | import ( |
| | | "nsqclient/TST/test" |
| | | "nsqclient/nsqcli" |
| | | "nsqclient/nsqclient" |
| | | "sync" |
| | | "time" |
| | | "unsafe" |
| | |
| | | |
| | | //export createProducer |
| | | func createProducer(addr string) unsafe.Pointer { |
| | | n, _ := nsqcli.NewProducer(addr) |
| | | return nsqcli.Save(n) |
| | | n, _ := nsqclient.NewProducer(addr) |
| | | return nsqclient.Save(n) |
| | | } |
| | | |
| | | //export destroyProducer |
| | | func destroyProducer(ph unsafe.Pointer) { |
| | | nsqcli.Unref(ph) |
| | | nsqcli.DestroyProducerPool() |
| | | nsqclient.Unref(ph) |
| | | nsqclient.DestroyProducerPool() |
| | | } |
| | | |
| | | func pcvt(ph unsafe.Pointer) nsqcli.Producer { |
| | | return nsqcli.Restore(ph).(nsqcli.Producer) |
| | | func pcvt(ph unsafe.Pointer) nsqclient.Producer { |
| | | return nsqclient.Restore(ph).(nsqclient.Producer) |
| | | } |
| | | |
| | | //export publish |
| | |
| | | ///////////////////////////////////////////////////////////// |
| | | |
| | | type consumer struct { |
| | | nsqcon *nsqcli.NsqConsumer |
| | | nsqcon *nsqclient.NsqConsumer |
| | | lck sync.Mutex |
| | | msgs [][]byte |
| | | } |
| | | |
| | | //export createConsumer |
| | | func createConsumer(topic, channel string) unsafe.Pointer { |
| | | if c, err := nsqcli.NewNsqConsumer(nil, topic, channel); err == nil { |
| | | if c, err := nsqclient.NewNsqConsumer(nil, topic, channel); err == nil { |
| | | con := &consumer{ |
| | | nsqcon: c, |
| | | } |
| | | return nsqcli.Save(con) |
| | | return nsqclient.Save(con) |
| | | } |
| | | return nil |
| | | } |
| | | |
| | | func ccvt(ch unsafe.Pointer) *consumer { |
| | | return nsqcli.Restore(ch).(*consumer) |
| | | return nsqclient.Restore(ch).(*consumer) |
| | | } |
| | | |
| | | //export destroyConsumer |
| | | func destroyConsumer(ch unsafe.Pointer) { |
| | | nsqcli.DestroyNsqConsumer(ccvt(ch).nsqcon) |
| | | nsqcli.Unref(ch) |
| | | nsqclient.DestroyNsqConsumer(ccvt(ch).nsqcon) |
| | | nsqclient.Unref(ch) |
| | | } |
| | | |
| | | //export Run |
New file |
| | |
| | | The MIT License (MIT) |
| | | |
| | | Copyright (c) 2013 Fatih Arslan |
| | | |
| | | Permission is hereby granted, free of charge, to any person obtaining a copy of |
| | | this software and associated documentation files (the "Software"), to deal in |
| | | the Software without restriction, including without limitation the rights to |
| | | use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of |
| | | the Software, and to permit persons to whom the Software is furnished to do so, |
| | | subject to the following conditions: |
| | | |
| | | The above copyright notice and this permission notice shall be included in all |
| | | copies or substantial portions of the Software. |
| | | |
| | | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| | | IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS |
| | | FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR |
| | | COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER |
| | | IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN |
| | | CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
New file |
| | |
| | | # NSQPool |
| | | |
| | | NSQPool is a thread safe connection pool for nsq producer. It can be used to |
| | | manage and reuse nsq producer connection. |
| | | |
| | | |
| | | ## Install and Usage |
| | | |
| | | Install the package with: |
| | | |
| | | ```bash |
| | | github.com/qgymje/nsqpool |
| | | ``` |
| | | |
| | | Import it with: |
| | | |
| | | ```go |
| | | import ( |
| | | "github.com/qgymje/nsqpool" |
| | | nsq "github.com/nsqio/go-nsq" |
| | | ) |
| | | ``` |
| | | |
| | | and use `pool` as the package name inside the code. |
| | | |
| | | ## Example |
| | | |
| | | ```go |
| | | // create a factory() to be used with channel based pool |
| | | factory := func() (*nsq.Producer, error) { |
| | | config := nsq.NewConfig() |
| | | return nsq.NewProducer(":4150", config) |
| | | } |
| | | |
| | | nsqPool, err := pool.NewChannelPool(5, 30, factory) |
| | | |
| | | producer, err := nsqPool.Get() |
| | | |
| | | producer.Publish("topic", "some data") |
| | | // do something with producer and put it back to the pool by closing the connection |
| | | // (this doesn't close the underlying connection instead it's putting it back |
| | | // to the pool). |
| | | producer.Close() |
| | | |
| | | // close pool any time you want, this closes all the connections inside a pool |
| | | nsqPool.Close() |
| | | |
| | | // currently available connections in the pool |
| | | current := nsqPool.Len() |
| | | ``` |
| | | |
| | | ## License |
| | | |
| | | The MIT License (MIT) - see LICENSE for more details |
New file |
| | |
| | | package nsqclient |
| | | |
| | | import ( |
| | | "errors" |
| | | "fmt" |
| | | "sync" |
| | | |
| | | nsq "github.com/nsqio/go-nsq" |
| | | ) |
| | | |
| | | // channelPool implements the Pool interface based on buffered channels. |
| | | type channelPool struct { |
| | | // storage for our net.Conn connections |
| | | mu sync.Mutex |
| | | conns chan *nsq.Producer |
| | | |
| | | // net.Conn generator |
| | | factory Factory |
| | | } |
| | | |
| | | // Factory is a function to create new connections. |
| | | type Factory func() (*nsq.Producer, error) |
| | | |
| | | // NewChannelPool returns a new pool based on buffered channels with an initial |
| | | // capacity and maximum capacity. Factory is used when initial capacity is |
| | | // greater than zero to fill the pool. A zero initialCap doesn't fill the Pool |
| | | // until a new Get() is called. During a Get(), If there is no new connection |
| | | // available in the pool, a new connection will be created via the Factory() |
| | | // method. |
| | | func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) { |
| | | if initialCap < 0 || maxCap <= 0 || initialCap > maxCap { |
| | | return nil, errors.New("invalid capacity settings") |
| | | } |
| | | |
| | | c := &channelPool{ |
| | | conns: make(chan *nsq.Producer, maxCap), |
| | | factory: factory, |
| | | } |
| | | |
| | | // create initial connections, if something goes wrong, |
| | | // just close the pool error out. |
| | | for i := 0; i < initialCap; i++ { |
| | | conn, err := factory() |
| | | if err != nil { |
| | | c.Close() |
| | | return nil, fmt.Errorf("factory is not able to fill the pool: %s", err) |
| | | } |
| | | c.conns <- conn |
| | | } |
| | | |
| | | return c, nil |
| | | } |
| | | |
| | | func (c *channelPool) getConns() chan *nsq.Producer { |
| | | c.mu.Lock() |
| | | conns := c.conns |
| | | c.mu.Unlock() |
| | | return conns |
| | | } |
| | | |
| | | // Get implements the Pool interfaces Get() method. If there is no new |
| | | // connection available in the pool, a new connection will be created via the |
| | | // Factory() method. |
| | | func (c *channelPool) Get() (*PoolConn, error) { |
| | | conns := c.getConns() |
| | | if conns == nil { |
| | | return nil, ErrClosed |
| | | } |
| | | |
| | | // wrap our connections with out custom net.Conn implementation (wrapConn |
| | | // method) that puts the connection back to the pool if it's closed. |
| | | select { |
| | | case conn := <-conns: |
| | | if conn == nil { |
| | | return nil, ErrClosed |
| | | } |
| | | |
| | | return c.wrapConn(conn), nil |
| | | default: |
| | | conn, err := c.factory() |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | |
| | | return c.wrapConn(conn), nil |
| | | } |
| | | } |
| | | |
| | | // put puts the connection back to the pool. If the pool is full or closed, |
| | | // conn is simply closed. A nil conn will be rejected. |
| | | func (c *channelPool) put(conn *nsq.Producer) error { |
| | | if conn == nil { |
| | | return errors.New("connection is nil. rejecting") |
| | | } |
| | | |
| | | c.mu.Lock() |
| | | defer c.mu.Unlock() |
| | | |
| | | if c.conns == nil { |
| | | // pool is closed, close passed connection |
| | | conn.Stop() |
| | | return nil |
| | | } |
| | | |
| | | // put the resource back into the pool. If the pool is full, this will |
| | | // block and the default case will be executed. |
| | | select { |
| | | case c.conns <- conn: |
| | | return nil |
| | | default: |
| | | // pool is full, close passed connection |
| | | conn.Stop() |
| | | return nil |
| | | } |
| | | } |
| | | |
| | | func (c *channelPool) Close() { |
| | | c.mu.Lock() |
| | | conns := c.conns |
| | | c.conns = nil |
| | | c.factory = nil |
| | | c.mu.Unlock() |
| | | |
| | | if conns == nil { |
| | | return |
| | | } |
| | | |
| | | close(conns) |
| | | for conn := range conns { |
| | | conn.Stop() |
| | | } |
| | | } |
| | | |
| | | func (c *channelPool) Len() int { return len(c.getConns()) } |
New file |
| | |
| | | package nsqclient |
| | | |
| | | import ( |
| | | "math/rand" |
| | | "sync" |
| | | "testing" |
| | | "time" |
| | | |
| | | "github.com/nsqio/go-nsq" |
| | | ) |
| | | |
| | | var ( |
| | | InitialCap = 2 |
| | | MaximumCap = 10 |
| | | factory = func() (*nsq.Producer, error) { |
| | | config := nsq.NewConfig() |
| | | return nsq.NewProducer(":4160", config) |
| | | } |
| | | ) |
| | | |
| | | func newChannelPool() (Pool, error) { |
| | | return NewChannelPool(InitialCap, MaximumCap, factory) |
| | | } |
| | | |
| | | func TestNew(t *testing.T) { |
| | | _, err := newChannelPool() |
| | | if err != nil { |
| | | t.Errorf("New error: %s", err) |
| | | } |
| | | } |
| | | |
| | | func TestPool_Get(t *testing.T) { |
| | | p, _ := newChannelPool() |
| | | defer p.Close() |
| | | |
| | | _, err := p.Get() |
| | | if err != nil { |
| | | t.Errorf("Get error: %s", err) |
| | | } |
| | | |
| | | // after one get, current capacity should be lowered by one. |
| | | if p.Len() != (InitialCap - 1) { |
| | | t.Errorf("Get error. Expecting %d, got %d", |
| | | (InitialCap - 1), p.Len()) |
| | | } |
| | | |
| | | // get them all |
| | | var wg sync.WaitGroup |
| | | for i := 0; i < (InitialCap - 1); i++ { |
| | | wg.Add(1) |
| | | go func() { |
| | | defer wg.Done() |
| | | _, err := p.Get() |
| | | if err != nil { |
| | | t.Errorf("Get error: %s", err) |
| | | } |
| | | }() |
| | | } |
| | | wg.Wait() |
| | | |
| | | if p.Len() != 0 { |
| | | t.Errorf("Get error. Expecting %d, got %d", |
| | | (InitialCap - 1), p.Len()) |
| | | } |
| | | |
| | | _, err = p.Get() |
| | | if err != nil { |
| | | t.Errorf("Get error: %s", err) |
| | | } |
| | | } |
| | | |
| | | func TestPool_Put(t *testing.T) { |
| | | p, err := NewChannelPool(0, 30, factory) |
| | | if err != nil { |
| | | t.Fatal(err) |
| | | } |
| | | defer p.Close() |
| | | |
| | | // get/create from the pool |
| | | conns := make([]*PoolConn, MaximumCap) |
| | | for i := 0; i < MaximumCap; i++ { |
| | | conn, _ := p.Get() |
| | | conns[i] = conn |
| | | } |
| | | |
| | | // now put them all back |
| | | for _, conn := range conns { |
| | | conn.Close() |
| | | } |
| | | |
| | | if p.Len() != MaximumCap { |
| | | t.Errorf("Put error len. Expecting %d, got %d", |
| | | 1, p.Len()) |
| | | } |
| | | |
| | | conn, _ := p.Get() |
| | | p.Close() // close pool |
| | | |
| | | conn.Close() // try to put into a full pool |
| | | if p.Len() != 0 { |
| | | t.Errorf("Put error. Closed pool shouldn't allow to put connections.") |
| | | } |
| | | } |
| | | |
| | | func TestPool_PutUnusableConn(t *testing.T) { |
| | | p, _ := newChannelPool() |
| | | defer p.Close() |
| | | |
| | | // ensure pool is not empty |
| | | conn, _ := p.Get() |
| | | conn.Close() |
| | | |
| | | poolSize := p.Len() |
| | | conn, _ = p.Get() |
| | | conn.Close() |
| | | if p.Len() != poolSize { |
| | | t.Errorf("Pool size is expected to be equal to initial size") |
| | | } |
| | | |
| | | conn, _ = p.Get() |
| | | conn.MarkUnusable() |
| | | |
| | | conn.Close() |
| | | if p.Len() != poolSize-1 { |
| | | t.Errorf("Pool size is expected to be initial_size - 1 [%d:%d]", p.Len(), poolSize-1) |
| | | } |
| | | } |
| | | |
| | | func TestPool_UsedCapacity(t *testing.T) { |
| | | p, _ := newChannelPool() |
| | | defer p.Close() |
| | | |
| | | if p.Len() != InitialCap { |
| | | t.Errorf("InitialCap error. Expecting %d, got %d", |
| | | InitialCap, p.Len()) |
| | | } |
| | | } |
| | | |
| | | func TestPool_Close(t *testing.T) { |
| | | p, _ := newChannelPool() |
| | | |
| | | // now close it and test all cases we are expecting. |
| | | p.Close() |
| | | |
| | | c := p.(*channelPool) |
| | | |
| | | if c.conns != nil { |
| | | t.Errorf("Close error, conns channel should be nil") |
| | | } |
| | | |
| | | if c.factory != nil { |
| | | t.Errorf("Close error, factory should be nil") |
| | | } |
| | | |
| | | _, err := p.Get() |
| | | if err == nil { |
| | | t.Errorf("Close error, get conn should return an error") |
| | | } |
| | | |
| | | if p.Len() != 0 { |
| | | t.Errorf("Close error used capacity. Expecting 0, got %d", p.Len()) |
| | | } |
| | | } |
| | | |
| | | func TestPoolConcurrent(t *testing.T) { |
| | | p, _ := newChannelPool() |
| | | pipe := make(chan *PoolConn, 0) |
| | | |
| | | go func() { |
| | | p.Close() |
| | | }() |
| | | |
| | | for i := 0; i < MaximumCap; i++ { |
| | | go func() { |
| | | conn, _ := p.Get() |
| | | |
| | | pipe <- conn |
| | | }() |
| | | |
| | | go func() { |
| | | conn := <-pipe |
| | | if conn == nil { |
| | | return |
| | | } |
| | | conn.Close() |
| | | }() |
| | | } |
| | | } |
| | | |
| | | func TestPoolConcurrent2(t *testing.T) { |
| | | p, _ := NewChannelPool(0, 30, factory) |
| | | |
| | | var wg sync.WaitGroup |
| | | |
| | | go func() { |
| | | for i := 0; i < 10; i++ { |
| | | wg.Add(1) |
| | | go func(i int) { |
| | | conn, _ := p.Get() |
| | | time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) |
| | | conn.Close() |
| | | wg.Done() |
| | | }(i) |
| | | } |
| | | }() |
| | | |
| | | for i := 0; i < 10; i++ { |
| | | wg.Add(1) |
| | | go func(i int) { |
| | | conn, _ := p.Get() |
| | | time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) |
| | | conn.Close() |
| | | wg.Done() |
| | | }(i) |
| | | } |
| | | |
| | | wg.Wait() |
| | | } |
New file |
| | |
| | | package nsqclient |
| | | |
| | | import ( |
| | | "sync" |
| | | |
| | | nsq "github.com/nsqio/go-nsq" |
| | | ) |
| | | |
| | | // PoolConn is a wrapper around net.Conn to modify the the behavior of |
| | | // net.Conn's Close() method. |
| | | type PoolConn struct { |
| | | *nsq.Producer |
| | | mu sync.RWMutex |
| | | c *channelPool |
| | | unusable bool |
| | | } |
| | | |
| | | // Close puts the given connects back to the pool instead of closing it. |
| | | func (p *PoolConn) Close() error { |
| | | p.mu.RLock() |
| | | defer p.mu.RUnlock() |
| | | |
| | | if p.unusable { |
| | | if p.Producer != nil { |
| | | p.Producer.Stop() |
| | | return nil |
| | | } |
| | | return nil |
| | | } |
| | | return p.c.put(p.Producer) |
| | | } |
| | | |
| | | // MarkUnusable marks the connection not usable any more, to let the pool close it instead of returning it to pool. |
| | | func (p *PoolConn) MarkUnusable() { |
| | | p.mu.Lock() |
| | | p.unusable = true |
| | | p.mu.Unlock() |
| | | } |
| | | |
| | | // newConn wraps a standard net.Conn to a poolConn net.Conn. |
| | | func (c *channelPool) wrapConn(conn *nsq.Producer) *PoolConn { |
| | | p := &PoolConn{c: c} |
| | | p.Producer = conn |
| | | return p |
| | | } |
New file |
| | |
| | | package nsqclient |
| | | |
| | | import ( |
| | | "context" |
| | | "fmt" |
| | | "time" |
| | | |
| | | nsq "github.com/nsqio/go-nsq" |
| | | ) |
| | | |
| | | type NsqConsumer struct { |
| | | consumer *nsq.Consumer |
| | | // handler nsq.Handler |
| | | handler func([]byte) error |
| | | ctx context.Context |
| | | ctxCancel context.CancelFunc |
| | | topic string |
| | | channel string |
| | | } |
| | | |
| | | func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) { |
| | | conf := nsq.NewConfig() |
| | | conf.MaxAttempts = 0 |
| | | conf.MsgTimeout = 10 * time.Minute // 默认一个消息最多能处理十分钟,否则就会重新丢入队列 |
| | | conf.LookupdPollInterval = 3 * time.Second // 调整consumer的重连间隔时间为3秒 |
| | | for _, option := range options { |
| | | option(conf) |
| | | } |
| | | |
| | | consumer, err := nsq.NewConsumer(topic, channel, conf) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | return &NsqConsumer{ |
| | | consumer: consumer, |
| | | ctx: ctx, |
| | | topic: topic, |
| | | channel: channel, |
| | | }, nil |
| | | } |
| | | |
| | | func DestroyNsqConsumer(c *NsqConsumer) { |
| | | if c != nil { |
| | | if c.ctxCancel != nil { |
| | | c.ctxCancel() |
| | | } |
| | | } |
| | | } |
| | | |
| | | // func (n *NsqConsumer) AddHandler(handler nsq.Handler) { |
| | | // n.handler = handler |
| | | // } |
| | | |
| | | func (n *NsqConsumer) AddHandler(handler func([]byte) error) { |
| | | n.handler = handler |
| | | } |
| | | |
| | | func (n *NsqConsumer) Run(qaddr string, concurrency int) error { |
| | | return n.RunDistributed([]string{qaddr}, nil, concurrency) |
| | | } |
| | | |
| | | func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error { |
| | | return n.RunDistributed(nil, []string{lookupAddr}, concurrency) |
| | | } |
| | | |
| | | func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error { |
| | | n.consumer.ChangeMaxInFlight(concurrency) |
| | | // n.consumer.AddConcurrentHandlers(n.handler, concurrency) |
| | | n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error { |
| | | return n.handler(msg.Body) |
| | | // return nil |
| | | }), concurrency) |
| | | |
| | | var err error |
| | | if len(qAddr) > 0 { |
| | | err = n.consumer.ConnectToNSQDs(qAddr) |
| | | } else if len(lAddr) > 0 { |
| | | err = n.consumer.ConnectToNSQLookupds(lAddr) |
| | | } else { |
| | | err = fmt.Errorf("Addr Must NOT Empty") |
| | | } |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | if n.ctx == nil { |
| | | n.ctx, n.ctxCancel = context.WithCancel(context.Background()) |
| | | } |
| | | |
| | | for { |
| | | select { |
| | | case <-n.ctx.Done(): |
| | | fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel) |
| | | n.consumer.Stop() |
| | | fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel) |
| | | return nil |
| | | } |
| | | } |
| | | } |
New file |
| | |
| | | package nsqclient |
| | | |
| | | // #include <stdlib.h> |
| | | import "C" |
| | | import ( |
| | | "sync" |
| | | "unsafe" |
| | | ) |
| | | |
| | | var ( |
| | | mutex sync.RWMutex |
| | | store = map[unsafe.Pointer]interface{}{} |
| | | ) |
| | | |
| | | func Save(v interface{}) unsafe.Pointer { |
| | | if v == nil { |
| | | return nil |
| | | } |
| | | |
| | | // Generate real fake C pointer. |
| | | // This pointer will not store any data, but will bi used for indexing purposes. |
| | | // Since Go doest allow to cast dangling pointer to unsafe.Pointer, we do rally allocate one byte. |
| | | // Why we need indexing, because Go doest allow C code to store pointers to Go data. |
| | | var ptr unsafe.Pointer = C.malloc(C.size_t(1)) |
| | | if ptr == nil { |
| | | panic("can't allocate 'cgo-pointer hack index pointer': ptr == nil") |
| | | } |
| | | |
| | | mutex.Lock() |
| | | store[ptr] = v |
| | | mutex.Unlock() |
| | | |
| | | return ptr |
| | | } |
| | | |
| | | func Restore(ptr unsafe.Pointer) (v interface{}) { |
| | | if ptr == nil { |
| | | return nil |
| | | } |
| | | |
| | | mutex.RLock() |
| | | v = store[ptr] |
| | | mutex.RUnlock() |
| | | return |
| | | } |
| | | |
| | | func Unref(ptr unsafe.Pointer) { |
| | | if ptr == nil { |
| | | return |
| | | } |
| | | |
| | | mutex.Lock() |
| | | delete(store, ptr) |
| | | mutex.Unlock() |
| | | |
| | | C.free(ptr) |
| | | } |
New file |
| | |
| | | // Package pool implements a pool of net.Conn interfaces to manage and reuse them. |
| | | package nsqclient |
| | | |
| | | import "errors" |
| | | |
| | | var ( |
| | | // ErrClosed is the error resulting if the pool is closed via pool.Close(). |
| | | ErrClosed = errors.New("pool is closed") |
| | | ) |
| | | |
| | | // Pool interface describes a pool implementation. A pool should have maximum |
| | | // capacity. An ideal pool is threadsafe and easy to use. |
| | | type Pool interface { |
| | | // Get returns a new connection from the pool. Closing the connections puts |
| | | // it back to the Pool. Closing it when the pool is destroyed or full will |
| | | // be counted as an error. |
| | | Get() (*PoolConn, error) |
| | | |
| | | // Close closes the pool and all its connections. After Close() the pool is |
| | | // no longer usable. |
| | | Close() |
| | | |
| | | // Len returns the current number of connections of the pool. |
| | | Len() int |
| | | } |
New file |
| | |
| | | package nsqclient |
| | | |
| | | import ( |
| | | "fmt" |
| | | "time" |
| | | |
| | | nsq "github.com/nsqio/go-nsq" |
| | | ) |
| | | |
| | | type Producer interface { |
| | | Publish(topic string, body []byte) error |
| | | MultiPublish(topic string, body [][]byte) error |
| | | DeferredPublish(topic string, delay time.Duration, body []byte) error |
| | | } |
| | | |
| | | var _ Producer = (*producer)(nil) |
| | | |
| | | type producer struct { |
| | | pool Pool |
| | | } |
| | | |
| | | var ( |
| | | // name pool producer |
| | | nsqList = make(map[string]Pool) |
| | | ) |
| | | |
| | | type Config struct { |
| | | Addr string `toml:"addr" json:"addr"` |
| | | InitSize int `toml:"init_size" json:"init_size"` |
| | | MaxSize int `toml:"max_size" json:"max_size"` |
| | | } |
| | | |
| | | func CreateProducerPool(configs map[string]Config) { |
| | | for name, conf := range configs { |
| | | n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize) |
| | | if err == nil { |
| | | nsqList[name] = n |
| | | // 支持ip:port寻址 |
| | | nsqList[conf.Addr] = n |
| | | } |
| | | } |
| | | } |
| | | |
| | | func DestroyProducerPool() { |
| | | for _, p := range nsqList { |
| | | p.Close() |
| | | } |
| | | } |
| | | |
| | | func GetProducer(key ...string) (*producer, error) { |
| | | k := "default" |
| | | if len(key) > 0 { |
| | | k = key[0] |
| | | } |
| | | if n, ok := nsqList[k]; ok { |
| | | return &producer{n}, nil |
| | | } |
| | | return nil, fmt.Errorf("GetProducer can't get producer") |
| | | } |
| | | |
| | | // CreateNSQProducer create nsq producer |
| | | func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) { |
| | | cfg := nsq.NewConfig() |
| | | for _, option := range options { |
| | | option(cfg) |
| | | } |
| | | |
| | | producer, err := nsq.NewProducer(addr, cfg) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | // producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError) |
| | | return producer, nil |
| | | } |
| | | |
| | | // CreateNSQProducerPool create a nwq producer pool |
| | | func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) { |
| | | factory := func() (*nsq.Producer, error) { |
| | | // TODO 这里应该执行ping方法来确定连接是正常的否则不应该创建conn |
| | | return newProducer(addr, options...) |
| | | } |
| | | nsqPool, err := NewChannelPool(initSize, maxSize, factory) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | return nsqPool, nil |
| | | } |
| | | |
| | | func NewProducer(addr string) (*producer, error) { |
| | | CreateProducerPool(map[string]Config{"default": {addr, 1, 1}}) |
| | | return GetProducer() |
| | | } |
| | | |
| | | func retry(num int, fn func() error) error { |
| | | var err error |
| | | for i := 0; i < num; i++ { |
| | | err = fn() |
| | | if err == nil { |
| | | break |
| | | } |
| | | } |
| | | return err |
| | | } |
| | | |
| | | func (p *producer) Publish(topic string, body []byte) error { |
| | | nsq, err := p.pool.Get() |
| | | if err != nil { |
| | | return err |
| | | } |
| | | defer nsq.Close() |
| | | |
| | | return retry(2, func() error { |
| | | return nsq.Publish(topic, body) |
| | | }) |
| | | } |
| | | |
| | | func (p *producer) MultiPublish(topic string, body [][]byte) error { |
| | | nsq, err := p.pool.Get() |
| | | if err != nil { |
| | | return err |
| | | } |
| | | defer nsq.Close() |
| | | |
| | | return retry(2, func() error { |
| | | return nsq.MultiPublish(topic, body) |
| | | }) |
| | | } |
| | | |
| | | func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error { |
| | | nsq, err := p.pool.Get() |
| | | if err != nil { |
| | | return err |
| | | } |
| | | defer nsq.Close() |
| | | |
| | | return retry(2, func() error { |
| | | return nsq.DeferredPublish(topic, delay, body) |
| | | }) |
| | | } |