From 1d0c9c4391d8ae40fb981a23170b3dad0d11aad9 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 16 五月 2023 16:52:56 +0800
Subject: [PATCH] for go get

---
 /dev/null |  139 ----------------------------------------------
 1 files changed, 0 insertions(+), 139 deletions(-)

diff --git a/nsqcli/LICENSE b/nsqcli/LICENSE
deleted file mode 100644
index 25fdaf6..0000000
--- a/nsqcli/LICENSE
+++ /dev/null
@@ -1,20 +0,0 @@
-The MIT License (MIT)
-
-Copyright (c) 2013 Fatih Arslan
-
-Permission is hereby granted, free of charge, to any person obtaining a copy of
-this software and associated documentation files (the "Software"), to deal in
-the Software without restriction, including without limitation the rights to
-use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
-the Software, and to permit persons to whom the Software is furnished to do so,
-subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in all
-copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
-FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
-COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
-IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
-CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/nsqcli/README.md b/nsqcli/README.md
deleted file mode 100644
index 432d86d..0000000
--- a/nsqcli/README.md
+++ /dev/null
@@ -1,54 +0,0 @@
-# NSQPool
-
-NSQPool is a thread safe connection pool for nsq producer. It can be used to
-manage and reuse nsq producer connection.
-
-
-## Install and Usage
-
-Install the package with:
-
-```bash
-github.com/qgymje/nsqpool
-```
-
-Import it with:
-
-```go
-import (
-    "github.com/qgymje/nsqpool"
-    nsq "github.com/nsqio/go-nsq"
-)
-```
-
-and use `pool` as the package name inside the code.
-
-## Example
-
-```go
-// create a factory() to be used with channel based pool
-factory := func() (*nsq.Producer, error) { 
-    config := nsq.NewConfig()
-    return nsq.NewProducer(":4150", config)
-}
-
-nsqPool, err := pool.NewChannelPool(5, 30, factory)
-
-producer, err := nsqPool.Get()
-
-producer.Publish("topic", "some data")
-// do something with producer and put it back to the pool by closing the connection
-// (this doesn't close the underlying connection instead it's putting it back
-// to the pool).
-producer.Close()
-
-// close pool any time you want, this closes all the connections inside a pool
-nsqPool.Close()
-
-// currently available connections in the pool
-current := nsqPool.Len()
-```
-
-## License
-
-The MIT License (MIT) - see LICENSE for more details
diff --git a/nsqcli/channel.go b/nsqcli/channel.go
deleted file mode 100644
index e583fcd..0000000
--- a/nsqcli/channel.go
+++ /dev/null
@@ -1,134 +0,0 @@
-package nsqcli
-
-import (
-	"errors"
-	"fmt"
-	"sync"
-
-	nsq "github.com/nsqio/go-nsq"
-)
-
-// channelPool implements the Pool interface based on buffered channels.
-type channelPool struct {
-	// storage for our net.Conn connections
-	mu    sync.Mutex
-	conns chan *nsq.Producer
-
-	// net.Conn generator
-	factory Factory
-}
-
-// Factory is a function to create new connections.
-type Factory func() (*nsq.Producer, error)
-
-// NewChannelPool returns a new pool based on buffered channels with an initial
-// capacity and maximum capacity. Factory is used when initial capacity is
-// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool
-// until a new Get() is called. During a Get(), If there is no new connection
-// available in the pool, a new connection will be created via the Factory()
-// method.
-func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) {
-	if initialCap < 0 || maxCap <= 0 || initialCap > maxCap {
-		return nil, errors.New("invalid capacity settings")
-	}
-
-	c := &channelPool{
-		conns:   make(chan *nsq.Producer, maxCap),
-		factory: factory,
-	}
-
-	// create initial connections, if something goes wrong,
-	// just close the pool error out.
-	for i := 0; i < initialCap; i++ {
-		conn, err := factory()
-		if err != nil {
-			c.Close()
-			return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
-		}
-		c.conns <- conn
-	}
-
-	return c, nil
-}
-
-func (c *channelPool) getConns() chan *nsq.Producer {
-	c.mu.Lock()
-	conns := c.conns
-	c.mu.Unlock()
-	return conns
-}
-
-// Get implements the Pool interfaces Get() method. If there is no new
-// connection available in the pool, a new connection will be created via the
-// Factory() method.
-func (c *channelPool) Get() (*PoolConn, error) {
-	conns := c.getConns()
-	if conns == nil {
-		return nil, ErrClosed
-	}
-
-	// wrap our connections with out custom net.Conn implementation (wrapConn
-	// method) that puts the connection back to the pool if it's closed.
-	select {
-	case conn := <-conns:
-		if conn == nil {
-			return nil, ErrClosed
-		}
-
-		return c.wrapConn(conn), nil
-	default:
-		conn, err := c.factory()
-		if err != nil {
-			return nil, err
-		}
-
-		return c.wrapConn(conn), nil
-	}
-}
-
-// put puts the connection back to the pool. If the pool is full or closed,
-// conn is simply closed. A nil conn will be rejected.
-func (c *channelPool) put(conn *nsq.Producer) error {
-	if conn == nil {
-		return errors.New("connection is nil. rejecting")
-	}
-
-	c.mu.Lock()
-	defer c.mu.Unlock()
-
-	if c.conns == nil {
-		// pool is closed, close passed connection
-		conn.Stop()
-		return nil
-	}
-
-	// put the resource back into the pool. If the pool is full, this will
-	// block and the default case will be executed.
-	select {
-	case c.conns <- conn:
-		return nil
-	default:
-		// pool is full, close passed connection
-		conn.Stop()
-		return nil
-	}
-}
-
-func (c *channelPool) Close() {
-	c.mu.Lock()
-	conns := c.conns
-	c.conns = nil
-	c.factory = nil
-	c.mu.Unlock()
-
-	if conns == nil {
-		return
-	}
-
-	close(conns)
-	for conn := range conns {
-		conn.Stop()
-	}
-}
-
-func (c *channelPool) Len() int { return len(c.getConns()) }
diff --git a/nsqcli/conn.go b/nsqcli/conn.go
deleted file mode 100644
index a66a10e..0000000
--- a/nsqcli/conn.go
+++ /dev/null
@@ -1,45 +0,0 @@
-package nsqcli
-
-import (
-	"sync"
-
-	nsq "github.com/nsqio/go-nsq"
-)
-
-// PoolConn is a wrapper around net.Conn to modify the the behavior of
-// net.Conn's Close() method.
-type PoolConn struct {
-	*nsq.Producer
-	mu       sync.RWMutex
-	c        *channelPool
-	unusable bool
-}
-
-// Close puts the given connects back to the pool instead of closing it.
-func (p *PoolConn) Close() error {
-	p.mu.RLock()
-	defer p.mu.RUnlock()
-
-	if p.unusable {
-		if p.Producer != nil {
-			p.Producer.Stop()
-			return nil
-		}
-		return nil
-	}
-	return p.c.put(p.Producer)
-}
-
-// MarkUnusable marks the connection not usable any more, to let the pool close it instead of returning it to pool.
-func (p *PoolConn) MarkUnusable() {
-	p.mu.Lock()
-	p.unusable = true
-	p.mu.Unlock()
-}
-
-// newConn wraps a standard net.Conn to a poolConn net.Conn.
-func (c *channelPool) wrapConn(conn *nsq.Producer) *PoolConn {
-	p := &PoolConn{c: c}
-	p.Producer = conn
-	return p
-}
diff --git a/nsqcli/consumer.go b/nsqcli/consumer.go
deleted file mode 100644
index 63f17f4..0000000
--- a/nsqcli/consumer.go
+++ /dev/null
@@ -1,99 +0,0 @@
-package nsqcli
-
-import (
-	"context"
-	"fmt"
-	"time"
-
-	nsq "github.com/nsqio/go-nsq"
-)
-
-type NsqConsumer struct {
-	consumer *nsq.Consumer
-	// handler   nsq.Handler
-	handler   func([]byte) error
-	ctx       context.Context
-	ctxCancel context.CancelFunc
-	topic     string
-	channel   string
-}
-
-func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
-	conf := nsq.NewConfig()
-	conf.MaxAttempts = 0
-	conf.MsgTimeout = 10 * time.Minute         // 榛樿涓�涓秷鎭渶澶氳兘澶勭悊鍗佸垎閽燂紝鍚﹀垯灏变細閲嶆柊涓㈠叆闃熷垪
-	conf.LookupdPollInterval = 3 * time.Second // 璋冩暣consumer鐨勯噸杩為棿闅旀椂闂翠负3绉�
-	for _, option := range options {
-		option(conf)
-	}
-
-	consumer, err := nsq.NewConsumer(topic, channel, conf)
-	if err != nil {
-		return nil, err
-	}
-	return &NsqConsumer{
-		consumer: consumer,
-		ctx:      ctx,
-		topic:    topic,
-		channel:  channel,
-	}, nil
-}
-
-func DestroyNsqConsumer(c *NsqConsumer) {
-	if c != nil {
-		if c.ctxCancel != nil {
-			c.ctxCancel()
-		}
-	}
-}
-
-// func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
-// 	n.handler = handler
-// }
-
-func (n *NsqConsumer) AddHandler(handler func([]byte) error) {
-	n.handler = handler
-}
-
-func (n *NsqConsumer) Run(qaddr string, concurrency int) error {
-	return n.RunDistributed([]string{qaddr}, nil, concurrency)
-}
-
-func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error {
-	return n.RunDistributed(nil, []string{lookupAddr}, concurrency)
-}
-
-func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error {
-	n.consumer.ChangeMaxInFlight(concurrency)
-	// n.consumer.AddConcurrentHandlers(n.handler, concurrency)
-	n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error {
-		return n.handler(msg.Body)
-		// return nil
-	}), concurrency)
-
-	var err error
-	if len(qAddr) > 0 {
-		err = n.consumer.ConnectToNSQDs(qAddr)
-	} else if len(lAddr) > 0 {
-		err = n.consumer.ConnectToNSQLookupds(lAddr)
-	} else {
-		err = fmt.Errorf("Addr Must NOT Empty")
-	}
-	if err != nil {
-		return err
-	}
-
-	if n.ctx == nil {
-		n.ctx, n.ctxCancel = context.WithCancel(context.Background())
-	}
-
-	for {
-		select {
-		case <-n.ctx.Done():
-			fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel)
-			n.consumer.Stop()
-			fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel)
-			return nil
-		}
-	}
-}
diff --git a/nsqcli/pointer.go b/nsqcli/pointer.go
deleted file mode 100644
index 0f0bf38..0000000
--- a/nsqcli/pointer.go
+++ /dev/null
@@ -1,57 +0,0 @@
-package nsqcli
-
-// #include <stdlib.h>
-import "C"
-import (
-	"sync"
-	"unsafe"
-)
-
-var (
-	mutex sync.RWMutex
-	store = map[unsafe.Pointer]interface{}{}
-)
-
-func Save(v interface{}) unsafe.Pointer {
-	if v == nil {
-		return nil
-	}
-
-	// Generate real fake C pointer.
-	// This pointer will not store any data, but will bi used for indexing purposes.
-	// Since Go doest allow to cast dangling pointer to unsafe.Pointer, we do rally allocate one byte.
-	// Why we need indexing, because Go doest allow C code to store pointers to Go data.
-	var ptr unsafe.Pointer = C.malloc(C.size_t(1))
-	if ptr == nil {
-		panic("can't allocate 'cgo-pointer hack index pointer': ptr == nil")
-	}
-
-	mutex.Lock()
-	store[ptr] = v
-	mutex.Unlock()
-
-	return ptr
-}
-
-func Restore(ptr unsafe.Pointer) (v interface{}) {
-	if ptr == nil {
-		return nil
-	}
-
-	mutex.RLock()
-	v = store[ptr]
-	mutex.RUnlock()
-	return
-}
-
-func Unref(ptr unsafe.Pointer) {
-	if ptr == nil {
-		return
-	}
-
-	mutex.Lock()
-	delete(store, ptr)
-	mutex.Unlock()
-
-	C.free(ptr)
-}
diff --git a/nsqcli/pool.go b/nsqcli/pool.go
deleted file mode 100644
index 236beed..0000000
--- a/nsqcli/pool.go
+++ /dev/null
@@ -1,25 +0,0 @@
-// Package pool implements a pool of net.Conn interfaces to manage and reuse them.
-package nsqcli
-
-import "errors"
-
-var (
-	// ErrClosed is the error resulting if the pool is closed via pool.Close().
-	ErrClosed = errors.New("pool is closed")
-)
-
-// Pool interface describes a pool implementation. A pool should have maximum
-// capacity. An ideal pool is threadsafe and easy to use.
-type Pool interface {
-	// Get returns a new connection from the pool. Closing the connections puts
-	// it back to the Pool. Closing it when the pool is destroyed or full will
-	// be counted as an error.
-	Get() (*PoolConn, error)
-
-	// Close closes the pool and all its connections. After Close() the pool is
-	// no longer usable.
-	Close()
-
-	// Len returns the current number of connections of the pool.
-	Len() int
-}
diff --git a/nsqcli/producer.go b/nsqcli/producer.go
deleted file mode 100644
index 488435b..0000000
--- a/nsqcli/producer.go
+++ /dev/null
@@ -1,139 +0,0 @@
-package nsqcli
-
-import (
-	"fmt"
-	"time"
-
-	nsq "github.com/nsqio/go-nsq"
-)
-
-type Producer interface {
-	Publish(topic string, body []byte) error
-	MultiPublish(topic string, body [][]byte) error
-	DeferredPublish(topic string, delay time.Duration, body []byte) error
-}
-
-var _ Producer = (*producer)(nil)
-
-type producer struct {
-	pool Pool
-}
-
-var (
-	// 				   name   pool producer
-	nsqList = make(map[string]Pool)
-)
-
-type Config struct {
-	Addr     string `toml:"addr" json:"addr"`
-	InitSize int    `toml:"init_size" json:"init_size"`
-	MaxSize  int    `toml:"max_size" json:"max_size"`
-}
-
-func CreateProducerPool(configs map[string]Config) {
-	for name, conf := range configs {
-		n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize)
-		if err == nil {
-			nsqList[name] = n
-			// 鏀寔ip:port瀵诲潃
-			nsqList[conf.Addr] = n
-		}
-	}
-}
-
-func DestroyProducerPool() {
-	for _, p := range nsqList {
-		p.Close()
-	}
-}
-
-func GetProducer(key ...string) (*producer, error) {
-	k := "default"
-	if len(key) > 0 {
-		k = key[0]
-	}
-	if n, ok := nsqList[k]; ok {
-		return &producer{n}, nil
-	}
-	return nil, fmt.Errorf("GetProducer can't get producer")
-}
-
-// CreateNSQProducer create nsq producer
-func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) {
-	cfg := nsq.NewConfig()
-	for _, option := range options {
-		option(cfg)
-	}
-
-	producer, err := nsq.NewProducer(addr, cfg)
-	if err != nil {
-		return nil, err
-	}
-	// producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError)
-	return producer, nil
-}
-
-// CreateNSQProducerPool create a nwq producer pool
-func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) {
-	factory := func() (*nsq.Producer, error) {
-		// TODO 杩欓噷搴旇鎵цping鏂规硶鏉ョ‘瀹氳繛鎺ユ槸姝e父鐨勫惁鍒欎笉搴旇鍒涘缓conn
-		return newProducer(addr, options...)
-	}
-	nsqPool, err := NewChannelPool(initSize, maxSize, factory)
-	if err != nil {
-		return nil, err
-	}
-	return nsqPool, nil
-}
-
-func NewProducer(addr string) (*producer, error) {
-	CreateProducerPool(map[string]Config{"default": {addr, 1, 1}})
-	return GetProducer()
-}
-
-func retry(num int, fn func() error) error {
-	var err error
-	for i := 0; i < num; i++ {
-		err = fn()
-		if err == nil {
-			break
-		}
-	}
-	return err
-}
-
-func (p *producer) Publish(topic string, body []byte) error {
-	nsq, err := p.pool.Get()
-	if err != nil {
-		return err
-	}
-	defer nsq.Close()
-
-	return retry(2, func() error {
-		return nsq.Publish(topic, body)
-	})
-}
-
-func (p *producer) MultiPublish(topic string, body [][]byte) error {
-	nsq, err := p.pool.Get()
-	if err != nil {
-		return err
-	}
-	defer nsq.Close()
-
-	return retry(2, func() error {
-		return nsq.MultiPublish(topic, body)
-	})
-}
-
-func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error {
-	nsq, err := p.pool.Get()
-	if err != nil {
-		return err
-	}
-	defer nsq.Close()
-
-	return retry(2, func() error {
-		return nsq.DeferredPublish(topic, delay, body)
-	})
-}

--
Gitblit v1.8.0