package nsqclient import ( "math/rand" "sync" "testing" "time" "github.com/nsqio/go-nsq" ) var ( InitialCap = 2 MaximumCap = 10 factory = func() (*nsq.Producer, error) { config := nsq.NewConfig() return nsq.NewProducer(":4160", config) } ) func newChannelPool() (Pool, error) { return NewChannelPool(InitialCap, MaximumCap, factory) } func TestNew(t *testing.T) { _, err := newChannelPool() if err != nil { t.Errorf("New error: %s", err) } } func TestPool_Get(t *testing.T) { p, _ := newChannelPool() defer p.Close() _, err := p.Get() if err != nil { t.Errorf("Get error: %s", err) } // after one get, current capacity should be lowered by one. if p.Len() != (InitialCap - 1) { t.Errorf("Get error. Expecting %d, got %d", (InitialCap - 1), p.Len()) } // get them all var wg sync.WaitGroup for i := 0; i < (InitialCap - 1); i++ { wg.Add(1) go func() { defer wg.Done() _, err := p.Get() if err != nil { t.Errorf("Get error: %s", err) } }() } wg.Wait() if p.Len() != 0 { t.Errorf("Get error. Expecting %d, got %d", (InitialCap - 1), p.Len()) } _, err = p.Get() if err != nil { t.Errorf("Get error: %s", err) } } func TestPool_Put(t *testing.T) { p, err := NewChannelPool(0, 30, factory) if err != nil { t.Fatal(err) } defer p.Close() // get/create from the pool conns := make([]*PoolConn, MaximumCap) for i := 0; i < MaximumCap; i++ { conn, _ := p.Get() conns[i] = conn } // now put them all back for _, conn := range conns { conn.Close() } if p.Len() != MaximumCap { t.Errorf("Put error len. Expecting %d, got %d", 1, p.Len()) } conn, _ := p.Get() p.Close() // close pool conn.Close() // try to put into a full pool if p.Len() != 0 { t.Errorf("Put error. Closed pool shouldn't allow to put connections.") } } func TestPool_PutUnusableConn(t *testing.T) { p, _ := newChannelPool() defer p.Close() // ensure pool is not empty conn, _ := p.Get() conn.Close() poolSize := p.Len() conn, _ = p.Get() conn.Close() if p.Len() != poolSize { t.Errorf("Pool size is expected to be equal to initial size") } conn, _ = p.Get() conn.MarkUnusable() conn.Close() if p.Len() != poolSize-1 { t.Errorf("Pool size is expected to be initial_size - 1 [%d:%d]", p.Len(), poolSize-1) } } func TestPool_UsedCapacity(t *testing.T) { p, _ := newChannelPool() defer p.Close() if p.Len() != InitialCap { t.Errorf("InitialCap error. Expecting %d, got %d", InitialCap, p.Len()) } } func TestPool_Close(t *testing.T) { p, _ := newChannelPool() // now close it and test all cases we are expecting. p.Close() c := p.(*channelPool) if c.conns != nil { t.Errorf("Close error, conns channel should be nil") } if c.factory != nil { t.Errorf("Close error, factory should be nil") } _, err := p.Get() if err == nil { t.Errorf("Close error, get conn should return an error") } if p.Len() != 0 { t.Errorf("Close error used capacity. Expecting 0, got %d", p.Len()) } } func TestPoolConcurrent(t *testing.T) { p, _ := newChannelPool() pipe := make(chan *PoolConn, 0) go func() { p.Close() }() for i := 0; i < MaximumCap; i++ { go func() { conn, _ := p.Get() pipe <- conn }() go func() { conn := <-pipe if conn == nil { return } conn.Close() }() } } func TestPoolConcurrent2(t *testing.T) { p, _ := NewChannelPool(0, 30, factory) var wg sync.WaitGroup go func() { for i := 0; i < 10; i++ { wg.Add(1) go func(i int) { conn, _ := p.Get() time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) conn.Close() wg.Done() }(i) } }() for i := 0; i < 10; i++ { wg.Add(1) go func(i int) { conn, _ := p.Get() time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) conn.Close() wg.Done() }(i) } wg.Wait() }