From 9731130d8be7adc56010f0744ba4a6358d311110 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 16 五月 2023 16:48:57 +0800 Subject: [PATCH] for go get --- /dev/null | 218 ------------------------------------------- nsqcli/LICENSE | 0 nsqcli/channel.go | 2 nsqcli/conn.go | 2 nsqcli/pool.go | 2 nsqcli/consumer.go | 2 TST/test/test.go | 8 main.go | 26 ++-- nsqcli/pointer.go | 2 nsqcli/README.md | 0 nsqcli/producer.go | 2 11 files changed, 23 insertions(+), 241 deletions(-) diff --git a/TST/test/test.go b/TST/test/test.go index 2e1a679..5aaf283 100644 --- a/TST/test/test.go +++ b/TST/test/test.go @@ -4,16 +4,16 @@ "context" "fmt" "log" - "nsqclient/nsqclient" + "nsqclient/nsqcli" "time" ) func produce(two bool) { - p, _ := nsqclient.NewProducer("192.168.20.108:4150") + p, _ := nsqcli.NewProducer("192.168.20.108:4150") var str string for len(str) < 32 { - str += "cnsqclient dynamic library" + str += "cnsqcli dynamic library" } msgx := []byte(str + "--x") msgy := []byte(str + "--y") @@ -40,7 +40,7 @@ func consume(topic, channel string) { ctx, cancel := context.WithCancel(context.Background()) - if c, e := nsqclient.NewNsqConsumer(ctx, topic, channel); e != nil { + if c, e := nsqcli.NewNsqConsumer(ctx, topic, channel); e != nil { fmt.Println("NewNsqConsumer failed", e) return } else { diff --git a/main.go b/main.go index 4eec6c9..c01a2e1 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,7 @@ import ( "nsqclient/TST/test" - "nsqclient/nsqclient" + "nsqclient/nsqcli" "sync" "time" "unsafe" @@ -14,18 +14,18 @@ //export createProducer func createProducer(addr string) unsafe.Pointer { - n, _ := nsqclient.NewProducer(addr) - return nsqclient.Save(n) + n, _ := nsqcli.NewProducer(addr) + return nsqcli.Save(n) } //export destroyProducer func destroyProducer(ph unsafe.Pointer) { - nsqclient.Unref(ph) - nsqclient.DestroyProducerPool() + nsqcli.Unref(ph) + nsqcli.DestroyProducerPool() } -func pcvt(ph unsafe.Pointer) nsqclient.Producer { - return nsqclient.Restore(ph).(nsqclient.Producer) +func pcvt(ph unsafe.Pointer) nsqcli.Producer { + return nsqcli.Restore(ph).(nsqcli.Producer) } //export publish @@ -58,30 +58,30 @@ ///////////////////////////////////////////////////////////// type consumer struct { - nsqcon *nsqclient.NsqConsumer + nsqcon *nsqcli.NsqConsumer lck sync.Mutex msgs [][]byte } //export createConsumer func createConsumer(topic, channel string) unsafe.Pointer { - if c, err := nsqclient.NewNsqConsumer(nil, topic, channel); err == nil { + if c, err := nsqcli.NewNsqConsumer(nil, topic, channel); err == nil { con := &consumer{ nsqcon: c, } - return nsqclient.Save(con) + return nsqcli.Save(con) } return nil } func ccvt(ch unsafe.Pointer) *consumer { - return nsqclient.Restore(ch).(*consumer) + return nsqcli.Restore(ch).(*consumer) } //export destroyConsumer func destroyConsumer(ch unsafe.Pointer) { - nsqclient.DestroyNsqConsumer(ccvt(ch).nsqcon) - nsqclient.Unref(ch) + nsqcli.DestroyNsqConsumer(ccvt(ch).nsqcon) + nsqcli.Unref(ch) } //export Run diff --git a/nsqclient/LICENSE b/nsqcli/LICENSE similarity index 100% rename from nsqclient/LICENSE rename to nsqcli/LICENSE diff --git a/nsqclient/README.md b/nsqcli/README.md similarity index 100% rename from nsqclient/README.md rename to nsqcli/README.md diff --git a/nsqclient/channel.go b/nsqcli/channel.go similarity index 99% rename from nsqclient/channel.go rename to nsqcli/channel.go index 1594dcc..e583fcd 100644 --- a/nsqclient/channel.go +++ b/nsqcli/channel.go @@ -1,4 +1,4 @@ -package nsqclient +package nsqcli import ( "errors" diff --git a/nsqclient/conn.go b/nsqcli/conn.go similarity index 97% rename from nsqclient/conn.go rename to nsqcli/conn.go index 5c680b5..a66a10e 100644 --- a/nsqclient/conn.go +++ b/nsqcli/conn.go @@ -1,4 +1,4 @@ -package nsqclient +package nsqcli import ( "sync" diff --git a/nsqclient/consumer.go b/nsqcli/consumer.go similarity index 98% rename from nsqclient/consumer.go rename to nsqcli/consumer.go index a0df0b0..63f17f4 100644 --- a/nsqclient/consumer.go +++ b/nsqcli/consumer.go @@ -1,4 +1,4 @@ -package nsqclient +package nsqcli import ( "context" diff --git a/nsqclient/pointer.go b/nsqcli/pointer.go similarity index 97% rename from nsqclient/pointer.go rename to nsqcli/pointer.go index 1cba795..0f0bf38 100644 --- a/nsqclient/pointer.go +++ b/nsqcli/pointer.go @@ -1,4 +1,4 @@ -package nsqclient +package nsqcli // #include <stdlib.h> import "C" diff --git a/nsqclient/pool.go b/nsqcli/pool.go similarity index 97% rename from nsqclient/pool.go rename to nsqcli/pool.go index b773490..236beed 100644 --- a/nsqclient/pool.go +++ b/nsqcli/pool.go @@ -1,5 +1,5 @@ // Package pool implements a pool of net.Conn interfaces to manage and reuse them. -package nsqclient +package nsqcli import "errors" diff --git a/nsqclient/producer.go b/nsqcli/producer.go similarity index 99% rename from nsqclient/producer.go rename to nsqcli/producer.go index 717c7a1..488435b 100644 --- a/nsqclient/producer.go +++ b/nsqcli/producer.go @@ -1,4 +1,4 @@ -package nsqclient +package nsqcli import ( "fmt" diff --git a/nsqclient/channel_test.go b/nsqclient/channel_test.go deleted file mode 100644 index 725db9b..0000000 --- a/nsqclient/channel_test.go +++ /dev/null @@ -1,218 +0,0 @@ -package nsqclient - -import ( - "math/rand" - "sync" - "testing" - "time" - - "github.com/nsqio/go-nsq" -) - -var ( - InitialCap = 2 - MaximumCap = 10 - factory = func() (*nsq.Producer, error) { - config := nsq.NewConfig() - return nsq.NewProducer(":4160", config) - } -) - -func newChannelPool() (Pool, error) { - return NewChannelPool(InitialCap, MaximumCap, factory) -} - -func TestNew(t *testing.T) { - _, err := newChannelPool() - if err != nil { - t.Errorf("New error: %s", err) - } -} - -func TestPool_Get(t *testing.T) { - p, _ := newChannelPool() - defer p.Close() - - _, err := p.Get() - if err != nil { - t.Errorf("Get error: %s", err) - } - - // after one get, current capacity should be lowered by one. - if p.Len() != (InitialCap - 1) { - t.Errorf("Get error. Expecting %d, got %d", - (InitialCap - 1), p.Len()) - } - - // get them all - var wg sync.WaitGroup - for i := 0; i < (InitialCap - 1); i++ { - wg.Add(1) - go func() { - defer wg.Done() - _, err := p.Get() - if err != nil { - t.Errorf("Get error: %s", err) - } - }() - } - wg.Wait() - - if p.Len() != 0 { - t.Errorf("Get error. Expecting %d, got %d", - (InitialCap - 1), p.Len()) - } - - _, err = p.Get() - if err != nil { - t.Errorf("Get error: %s", err) - } -} - -func TestPool_Put(t *testing.T) { - p, err := NewChannelPool(0, 30, factory) - if err != nil { - t.Fatal(err) - } - defer p.Close() - - // get/create from the pool - conns := make([]*PoolConn, MaximumCap) - for i := 0; i < MaximumCap; i++ { - conn, _ := p.Get() - conns[i] = conn - } - - // now put them all back - for _, conn := range conns { - conn.Close() - } - - if p.Len() != MaximumCap { - t.Errorf("Put error len. Expecting %d, got %d", - 1, p.Len()) - } - - conn, _ := p.Get() - p.Close() // close pool - - conn.Close() // try to put into a full pool - if p.Len() != 0 { - t.Errorf("Put error. Closed pool shouldn't allow to put connections.") - } -} - -func TestPool_PutUnusableConn(t *testing.T) { - p, _ := newChannelPool() - defer p.Close() - - // ensure pool is not empty - conn, _ := p.Get() - conn.Close() - - poolSize := p.Len() - conn, _ = p.Get() - conn.Close() - if p.Len() != poolSize { - t.Errorf("Pool size is expected to be equal to initial size") - } - - conn, _ = p.Get() - conn.MarkUnusable() - - conn.Close() - if p.Len() != poolSize-1 { - t.Errorf("Pool size is expected to be initial_size - 1 [%d:%d]", p.Len(), poolSize-1) - } -} - -func TestPool_UsedCapacity(t *testing.T) { - p, _ := newChannelPool() - defer p.Close() - - if p.Len() != InitialCap { - t.Errorf("InitialCap error. Expecting %d, got %d", - InitialCap, p.Len()) - } -} - -func TestPool_Close(t *testing.T) { - p, _ := newChannelPool() - - // now close it and test all cases we are expecting. - p.Close() - - c := p.(*channelPool) - - if c.conns != nil { - t.Errorf("Close error, conns channel should be nil") - } - - if c.factory != nil { - t.Errorf("Close error, factory should be nil") - } - - _, err := p.Get() - if err == nil { - t.Errorf("Close error, get conn should return an error") - } - - if p.Len() != 0 { - t.Errorf("Close error used capacity. Expecting 0, got %d", p.Len()) - } -} - -func TestPoolConcurrent(t *testing.T) { - p, _ := newChannelPool() - pipe := make(chan *PoolConn, 0) - - go func() { - p.Close() - }() - - for i := 0; i < MaximumCap; i++ { - go func() { - conn, _ := p.Get() - - pipe <- conn - }() - - go func() { - conn := <-pipe - if conn == nil { - return - } - conn.Close() - }() - } -} - -func TestPoolConcurrent2(t *testing.T) { - p, _ := NewChannelPool(0, 30, factory) - - var wg sync.WaitGroup - - go func() { - for i := 0; i < 10; i++ { - wg.Add(1) - go func(i int) { - conn, _ := p.Get() - time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) - conn.Close() - wg.Done() - }(i) - } - }() - - for i := 0; i < 10; i++ { - wg.Add(1) - go func(i int) { - conn, _ := p.Get() - time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) - conn.Close() - wg.Done() - }(i) - } - - wg.Wait() -} -- Gitblit v1.8.0