From 9731130d8be7adc56010f0744ba4a6358d311110 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 16 五月 2023 16:48:57 +0800
Subject: [PATCH] for go get
---
/dev/null | 218 -------------------------------------------
nsqcli/LICENSE | 0
nsqcli/channel.go | 2
nsqcli/conn.go | 2
nsqcli/pool.go | 2
nsqcli/consumer.go | 2
TST/test/test.go | 8
main.go | 26 ++--
nsqcli/pointer.go | 2
nsqcli/README.md | 0
nsqcli/producer.go | 2
11 files changed, 23 insertions(+), 241 deletions(-)
diff --git a/TST/test/test.go b/TST/test/test.go
index 2e1a679..5aaf283 100644
--- a/TST/test/test.go
+++ b/TST/test/test.go
@@ -4,16 +4,16 @@
"context"
"fmt"
"log"
- "nsqclient/nsqclient"
+ "nsqclient/nsqcli"
"time"
)
func produce(two bool) {
- p, _ := nsqclient.NewProducer("192.168.20.108:4150")
+ p, _ := nsqcli.NewProducer("192.168.20.108:4150")
var str string
for len(str) < 32 {
- str += "cnsqclient dynamic library"
+ str += "cnsqcli dynamic library"
}
msgx := []byte(str + "--x")
msgy := []byte(str + "--y")
@@ -40,7 +40,7 @@
func consume(topic, channel string) {
ctx, cancel := context.WithCancel(context.Background())
- if c, e := nsqclient.NewNsqConsumer(ctx, topic, channel); e != nil {
+ if c, e := nsqcli.NewNsqConsumer(ctx, topic, channel); e != nil {
fmt.Println("NewNsqConsumer failed", e)
return
} else {
diff --git a/main.go b/main.go
index 4eec6c9..c01a2e1 100644
--- a/main.go
+++ b/main.go
@@ -6,7 +6,7 @@
import (
"nsqclient/TST/test"
- "nsqclient/nsqclient"
+ "nsqclient/nsqcli"
"sync"
"time"
"unsafe"
@@ -14,18 +14,18 @@
//export createProducer
func createProducer(addr string) unsafe.Pointer {
- n, _ := nsqclient.NewProducer(addr)
- return nsqclient.Save(n)
+ n, _ := nsqcli.NewProducer(addr)
+ return nsqcli.Save(n)
}
//export destroyProducer
func destroyProducer(ph unsafe.Pointer) {
- nsqclient.Unref(ph)
- nsqclient.DestroyProducerPool()
+ nsqcli.Unref(ph)
+ nsqcli.DestroyProducerPool()
}
-func pcvt(ph unsafe.Pointer) nsqclient.Producer {
- return nsqclient.Restore(ph).(nsqclient.Producer)
+func pcvt(ph unsafe.Pointer) nsqcli.Producer {
+ return nsqcli.Restore(ph).(nsqcli.Producer)
}
//export publish
@@ -58,30 +58,30 @@
/////////////////////////////////////////////////////////////
type consumer struct {
- nsqcon *nsqclient.NsqConsumer
+ nsqcon *nsqcli.NsqConsumer
lck sync.Mutex
msgs [][]byte
}
//export createConsumer
func createConsumer(topic, channel string) unsafe.Pointer {
- if c, err := nsqclient.NewNsqConsumer(nil, topic, channel); err == nil {
+ if c, err := nsqcli.NewNsqConsumer(nil, topic, channel); err == nil {
con := &consumer{
nsqcon: c,
}
- return nsqclient.Save(con)
+ return nsqcli.Save(con)
}
return nil
}
func ccvt(ch unsafe.Pointer) *consumer {
- return nsqclient.Restore(ch).(*consumer)
+ return nsqcli.Restore(ch).(*consumer)
}
//export destroyConsumer
func destroyConsumer(ch unsafe.Pointer) {
- nsqclient.DestroyNsqConsumer(ccvt(ch).nsqcon)
- nsqclient.Unref(ch)
+ nsqcli.DestroyNsqConsumer(ccvt(ch).nsqcon)
+ nsqcli.Unref(ch)
}
//export Run
diff --git a/nsqclient/LICENSE b/nsqcli/LICENSE
similarity index 100%
rename from nsqclient/LICENSE
rename to nsqcli/LICENSE
diff --git a/nsqclient/README.md b/nsqcli/README.md
similarity index 100%
rename from nsqclient/README.md
rename to nsqcli/README.md
diff --git a/nsqclient/channel.go b/nsqcli/channel.go
similarity index 99%
rename from nsqclient/channel.go
rename to nsqcli/channel.go
index 1594dcc..e583fcd 100644
--- a/nsqclient/channel.go
+++ b/nsqcli/channel.go
@@ -1,4 +1,4 @@
-package nsqclient
+package nsqcli
import (
"errors"
diff --git a/nsqclient/conn.go b/nsqcli/conn.go
similarity index 97%
rename from nsqclient/conn.go
rename to nsqcli/conn.go
index 5c680b5..a66a10e 100644
--- a/nsqclient/conn.go
+++ b/nsqcli/conn.go
@@ -1,4 +1,4 @@
-package nsqclient
+package nsqcli
import (
"sync"
diff --git a/nsqclient/consumer.go b/nsqcli/consumer.go
similarity index 98%
rename from nsqclient/consumer.go
rename to nsqcli/consumer.go
index a0df0b0..63f17f4 100644
--- a/nsqclient/consumer.go
+++ b/nsqcli/consumer.go
@@ -1,4 +1,4 @@
-package nsqclient
+package nsqcli
import (
"context"
diff --git a/nsqclient/pointer.go b/nsqcli/pointer.go
similarity index 97%
rename from nsqclient/pointer.go
rename to nsqcli/pointer.go
index 1cba795..0f0bf38 100644
--- a/nsqclient/pointer.go
+++ b/nsqcli/pointer.go
@@ -1,4 +1,4 @@
-package nsqclient
+package nsqcli
// #include <stdlib.h>
import "C"
diff --git a/nsqclient/pool.go b/nsqcli/pool.go
similarity index 97%
rename from nsqclient/pool.go
rename to nsqcli/pool.go
index b773490..236beed 100644
--- a/nsqclient/pool.go
+++ b/nsqcli/pool.go
@@ -1,5 +1,5 @@
// Package pool implements a pool of net.Conn interfaces to manage and reuse them.
-package nsqclient
+package nsqcli
import "errors"
diff --git a/nsqclient/producer.go b/nsqcli/producer.go
similarity index 99%
rename from nsqclient/producer.go
rename to nsqcli/producer.go
index 717c7a1..488435b 100644
--- a/nsqclient/producer.go
+++ b/nsqcli/producer.go
@@ -1,4 +1,4 @@
-package nsqclient
+package nsqcli
import (
"fmt"
diff --git a/nsqclient/channel_test.go b/nsqclient/channel_test.go
deleted file mode 100644
index 725db9b..0000000
--- a/nsqclient/channel_test.go
+++ /dev/null
@@ -1,218 +0,0 @@
-package nsqclient
-
-import (
- "math/rand"
- "sync"
- "testing"
- "time"
-
- "github.com/nsqio/go-nsq"
-)
-
-var (
- InitialCap = 2
- MaximumCap = 10
- factory = func() (*nsq.Producer, error) {
- config := nsq.NewConfig()
- return nsq.NewProducer(":4160", config)
- }
-)
-
-func newChannelPool() (Pool, error) {
- return NewChannelPool(InitialCap, MaximumCap, factory)
-}
-
-func TestNew(t *testing.T) {
- _, err := newChannelPool()
- if err != nil {
- t.Errorf("New error: %s", err)
- }
-}
-
-func TestPool_Get(t *testing.T) {
- p, _ := newChannelPool()
- defer p.Close()
-
- _, err := p.Get()
- if err != nil {
- t.Errorf("Get error: %s", err)
- }
-
- // after one get, current capacity should be lowered by one.
- if p.Len() != (InitialCap - 1) {
- t.Errorf("Get error. Expecting %d, got %d",
- (InitialCap - 1), p.Len())
- }
-
- // get them all
- var wg sync.WaitGroup
- for i := 0; i < (InitialCap - 1); i++ {
- wg.Add(1)
- go func() {
- defer wg.Done()
- _, err := p.Get()
- if err != nil {
- t.Errorf("Get error: %s", err)
- }
- }()
- }
- wg.Wait()
-
- if p.Len() != 0 {
- t.Errorf("Get error. Expecting %d, got %d",
- (InitialCap - 1), p.Len())
- }
-
- _, err = p.Get()
- if err != nil {
- t.Errorf("Get error: %s", err)
- }
-}
-
-func TestPool_Put(t *testing.T) {
- p, err := NewChannelPool(0, 30, factory)
- if err != nil {
- t.Fatal(err)
- }
- defer p.Close()
-
- // get/create from the pool
- conns := make([]*PoolConn, MaximumCap)
- for i := 0; i < MaximumCap; i++ {
- conn, _ := p.Get()
- conns[i] = conn
- }
-
- // now put them all back
- for _, conn := range conns {
- conn.Close()
- }
-
- if p.Len() != MaximumCap {
- t.Errorf("Put error len. Expecting %d, got %d",
- 1, p.Len())
- }
-
- conn, _ := p.Get()
- p.Close() // close pool
-
- conn.Close() // try to put into a full pool
- if p.Len() != 0 {
- t.Errorf("Put error. Closed pool shouldn't allow to put connections.")
- }
-}
-
-func TestPool_PutUnusableConn(t *testing.T) {
- p, _ := newChannelPool()
- defer p.Close()
-
- // ensure pool is not empty
- conn, _ := p.Get()
- conn.Close()
-
- poolSize := p.Len()
- conn, _ = p.Get()
- conn.Close()
- if p.Len() != poolSize {
- t.Errorf("Pool size is expected to be equal to initial size")
- }
-
- conn, _ = p.Get()
- conn.MarkUnusable()
-
- conn.Close()
- if p.Len() != poolSize-1 {
- t.Errorf("Pool size is expected to be initial_size - 1 [%d:%d]", p.Len(), poolSize-1)
- }
-}
-
-func TestPool_UsedCapacity(t *testing.T) {
- p, _ := newChannelPool()
- defer p.Close()
-
- if p.Len() != InitialCap {
- t.Errorf("InitialCap error. Expecting %d, got %d",
- InitialCap, p.Len())
- }
-}
-
-func TestPool_Close(t *testing.T) {
- p, _ := newChannelPool()
-
- // now close it and test all cases we are expecting.
- p.Close()
-
- c := p.(*channelPool)
-
- if c.conns != nil {
- t.Errorf("Close error, conns channel should be nil")
- }
-
- if c.factory != nil {
- t.Errorf("Close error, factory should be nil")
- }
-
- _, err := p.Get()
- if err == nil {
- t.Errorf("Close error, get conn should return an error")
- }
-
- if p.Len() != 0 {
- t.Errorf("Close error used capacity. Expecting 0, got %d", p.Len())
- }
-}
-
-func TestPoolConcurrent(t *testing.T) {
- p, _ := newChannelPool()
- pipe := make(chan *PoolConn, 0)
-
- go func() {
- p.Close()
- }()
-
- for i := 0; i < MaximumCap; i++ {
- go func() {
- conn, _ := p.Get()
-
- pipe <- conn
- }()
-
- go func() {
- conn := <-pipe
- if conn == nil {
- return
- }
- conn.Close()
- }()
- }
-}
-
-func TestPoolConcurrent2(t *testing.T) {
- p, _ := NewChannelPool(0, 30, factory)
-
- var wg sync.WaitGroup
-
- go func() {
- for i := 0; i < 10; i++ {
- wg.Add(1)
- go func(i int) {
- conn, _ := p.Get()
- time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
- conn.Close()
- wg.Done()
- }(i)
- }
- }()
-
- for i := 0; i < 10; i++ {
- wg.Add(1)
- go func(i int) {
- conn, _ := p.Get()
- time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
- conn.Close()
- wg.Done()
- }(i)
- }
-
- wg.Wait()
-}
--
Gitblit v1.8.0