From 1d0c9c4391d8ae40fb981a23170b3dad0d11aad9 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 16 五月 2023 16:52:56 +0800
Subject: [PATCH] for go get
---
/dev/null | 139 ----------------------------------------------
1 files changed, 0 insertions(+), 139 deletions(-)
diff --git a/nsqcli/LICENSE b/nsqcli/LICENSE
deleted file mode 100644
index 25fdaf6..0000000
--- a/nsqcli/LICENSE
+++ /dev/null
@@ -1,20 +0,0 @@
-The MIT License (MIT)
-
-Copyright (c) 2013 Fatih Arslan
-
-Permission is hereby granted, free of charge, to any person obtaining a copy of
-this software and associated documentation files (the "Software"), to deal in
-the Software without restriction, including without limitation the rights to
-use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
-the Software, and to permit persons to whom the Software is furnished to do so,
-subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in all
-copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
-FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
-COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
-IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
-CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/nsqcli/README.md b/nsqcli/README.md
deleted file mode 100644
index 432d86d..0000000
--- a/nsqcli/README.md
+++ /dev/null
@@ -1,54 +0,0 @@
-# NSQPool
-
-NSQPool is a thread safe connection pool for nsq producer. It can be used to
-manage and reuse nsq producer connection.
-
-
-## Install and Usage
-
-Install the package with:
-
-```bash
-github.com/qgymje/nsqpool
-```
-
-Import it with:
-
-```go
-import (
- "github.com/qgymje/nsqpool"
- nsq "github.com/nsqio/go-nsq"
-)
-```
-
-and use `pool` as the package name inside the code.
-
-## Example
-
-```go
-// create a factory() to be used with channel based pool
-factory := func() (*nsq.Producer, error) {
- config := nsq.NewConfig()
- return nsq.NewProducer(":4150", config)
-}
-
-nsqPool, err := pool.NewChannelPool(5, 30, factory)
-
-producer, err := nsqPool.Get()
-
-producer.Publish("topic", "some data")
-// do something with producer and put it back to the pool by closing the connection
-// (this doesn't close the underlying connection instead it's putting it back
-// to the pool).
-producer.Close()
-
-// close pool any time you want, this closes all the connections inside a pool
-nsqPool.Close()
-
-// currently available connections in the pool
-current := nsqPool.Len()
-```
-
-## License
-
-The MIT License (MIT) - see LICENSE for more details
diff --git a/nsqcli/channel.go b/nsqcli/channel.go
deleted file mode 100644
index e583fcd..0000000
--- a/nsqcli/channel.go
+++ /dev/null
@@ -1,134 +0,0 @@
-package nsqcli
-
-import (
- "errors"
- "fmt"
- "sync"
-
- nsq "github.com/nsqio/go-nsq"
-)
-
-// channelPool implements the Pool interface based on buffered channels.
-type channelPool struct {
- // storage for our net.Conn connections
- mu sync.Mutex
- conns chan *nsq.Producer
-
- // net.Conn generator
- factory Factory
-}
-
-// Factory is a function to create new connections.
-type Factory func() (*nsq.Producer, error)
-
-// NewChannelPool returns a new pool based on buffered channels with an initial
-// capacity and maximum capacity. Factory is used when initial capacity is
-// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool
-// until a new Get() is called. During a Get(), If there is no new connection
-// available in the pool, a new connection will be created via the Factory()
-// method.
-func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) {
- if initialCap < 0 || maxCap <= 0 || initialCap > maxCap {
- return nil, errors.New("invalid capacity settings")
- }
-
- c := &channelPool{
- conns: make(chan *nsq.Producer, maxCap),
- factory: factory,
- }
-
- // create initial connections, if something goes wrong,
- // just close the pool error out.
- for i := 0; i < initialCap; i++ {
- conn, err := factory()
- if err != nil {
- c.Close()
- return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
- }
- c.conns <- conn
- }
-
- return c, nil
-}
-
-func (c *channelPool) getConns() chan *nsq.Producer {
- c.mu.Lock()
- conns := c.conns
- c.mu.Unlock()
- return conns
-}
-
-// Get implements the Pool interfaces Get() method. If there is no new
-// connection available in the pool, a new connection will be created via the
-// Factory() method.
-func (c *channelPool) Get() (*PoolConn, error) {
- conns := c.getConns()
- if conns == nil {
- return nil, ErrClosed
- }
-
- // wrap our connections with out custom net.Conn implementation (wrapConn
- // method) that puts the connection back to the pool if it's closed.
- select {
- case conn := <-conns:
- if conn == nil {
- return nil, ErrClosed
- }
-
- return c.wrapConn(conn), nil
- default:
- conn, err := c.factory()
- if err != nil {
- return nil, err
- }
-
- return c.wrapConn(conn), nil
- }
-}
-
-// put puts the connection back to the pool. If the pool is full or closed,
-// conn is simply closed. A nil conn will be rejected.
-func (c *channelPool) put(conn *nsq.Producer) error {
- if conn == nil {
- return errors.New("connection is nil. rejecting")
- }
-
- c.mu.Lock()
- defer c.mu.Unlock()
-
- if c.conns == nil {
- // pool is closed, close passed connection
- conn.Stop()
- return nil
- }
-
- // put the resource back into the pool. If the pool is full, this will
- // block and the default case will be executed.
- select {
- case c.conns <- conn:
- return nil
- default:
- // pool is full, close passed connection
- conn.Stop()
- return nil
- }
-}
-
-func (c *channelPool) Close() {
- c.mu.Lock()
- conns := c.conns
- c.conns = nil
- c.factory = nil
- c.mu.Unlock()
-
- if conns == nil {
- return
- }
-
- close(conns)
- for conn := range conns {
- conn.Stop()
- }
-}
-
-func (c *channelPool) Len() int { return len(c.getConns()) }
diff --git a/nsqcli/conn.go b/nsqcli/conn.go
deleted file mode 100644
index a66a10e..0000000
--- a/nsqcli/conn.go
+++ /dev/null
@@ -1,45 +0,0 @@
-package nsqcli
-
-import (
- "sync"
-
- nsq "github.com/nsqio/go-nsq"
-)
-
-// PoolConn is a wrapper around net.Conn to modify the the behavior of
-// net.Conn's Close() method.
-type PoolConn struct {
- *nsq.Producer
- mu sync.RWMutex
- c *channelPool
- unusable bool
-}
-
-// Close puts the given connects back to the pool instead of closing it.
-func (p *PoolConn) Close() error {
- p.mu.RLock()
- defer p.mu.RUnlock()
-
- if p.unusable {
- if p.Producer != nil {
- p.Producer.Stop()
- return nil
- }
- return nil
- }
- return p.c.put(p.Producer)
-}
-
-// MarkUnusable marks the connection not usable any more, to let the pool close it instead of returning it to pool.
-func (p *PoolConn) MarkUnusable() {
- p.mu.Lock()
- p.unusable = true
- p.mu.Unlock()
-}
-
-// newConn wraps a standard net.Conn to a poolConn net.Conn.
-func (c *channelPool) wrapConn(conn *nsq.Producer) *PoolConn {
- p := &PoolConn{c: c}
- p.Producer = conn
- return p
-}
diff --git a/nsqcli/consumer.go b/nsqcli/consumer.go
deleted file mode 100644
index 63f17f4..0000000
--- a/nsqcli/consumer.go
+++ /dev/null
@@ -1,99 +0,0 @@
-package nsqcli
-
-import (
- "context"
- "fmt"
- "time"
-
- nsq "github.com/nsqio/go-nsq"
-)
-
-type NsqConsumer struct {
- consumer *nsq.Consumer
- // handler nsq.Handler
- handler func([]byte) error
- ctx context.Context
- ctxCancel context.CancelFunc
- topic string
- channel string
-}
-
-func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
- conf := nsq.NewConfig()
- conf.MaxAttempts = 0
- conf.MsgTimeout = 10 * time.Minute // 榛樿涓�涓秷鎭渶澶氳兘澶勭悊鍗佸垎閽燂紝鍚﹀垯灏变細閲嶆柊涓㈠叆闃熷垪
- conf.LookupdPollInterval = 3 * time.Second // 璋冩暣consumer鐨勯噸杩為棿闅旀椂闂翠负3绉�
- for _, option := range options {
- option(conf)
- }
-
- consumer, err := nsq.NewConsumer(topic, channel, conf)
- if err != nil {
- return nil, err
- }
- return &NsqConsumer{
- consumer: consumer,
- ctx: ctx,
- topic: topic,
- channel: channel,
- }, nil
-}
-
-func DestroyNsqConsumer(c *NsqConsumer) {
- if c != nil {
- if c.ctxCancel != nil {
- c.ctxCancel()
- }
- }
-}
-
-// func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
-// n.handler = handler
-// }
-
-func (n *NsqConsumer) AddHandler(handler func([]byte) error) {
- n.handler = handler
-}
-
-func (n *NsqConsumer) Run(qaddr string, concurrency int) error {
- return n.RunDistributed([]string{qaddr}, nil, concurrency)
-}
-
-func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error {
- return n.RunDistributed(nil, []string{lookupAddr}, concurrency)
-}
-
-func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error {
- n.consumer.ChangeMaxInFlight(concurrency)
- // n.consumer.AddConcurrentHandlers(n.handler, concurrency)
- n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error {
- return n.handler(msg.Body)
- // return nil
- }), concurrency)
-
- var err error
- if len(qAddr) > 0 {
- err = n.consumer.ConnectToNSQDs(qAddr)
- } else if len(lAddr) > 0 {
- err = n.consumer.ConnectToNSQLookupds(lAddr)
- } else {
- err = fmt.Errorf("Addr Must NOT Empty")
- }
- if err != nil {
- return err
- }
-
- if n.ctx == nil {
- n.ctx, n.ctxCancel = context.WithCancel(context.Background())
- }
-
- for {
- select {
- case <-n.ctx.Done():
- fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel)
- n.consumer.Stop()
- fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel)
- return nil
- }
- }
-}
diff --git a/nsqcli/pointer.go b/nsqcli/pointer.go
deleted file mode 100644
index 0f0bf38..0000000
--- a/nsqcli/pointer.go
+++ /dev/null
@@ -1,57 +0,0 @@
-package nsqcli
-
-// #include <stdlib.h>
-import "C"
-import (
- "sync"
- "unsafe"
-)
-
-var (
- mutex sync.RWMutex
- store = map[unsafe.Pointer]interface{}{}
-)
-
-func Save(v interface{}) unsafe.Pointer {
- if v == nil {
- return nil
- }
-
- // Generate real fake C pointer.
- // This pointer will not store any data, but will bi used for indexing purposes.
- // Since Go doest allow to cast dangling pointer to unsafe.Pointer, we do rally allocate one byte.
- // Why we need indexing, because Go doest allow C code to store pointers to Go data.
- var ptr unsafe.Pointer = C.malloc(C.size_t(1))
- if ptr == nil {
- panic("can't allocate 'cgo-pointer hack index pointer': ptr == nil")
- }
-
- mutex.Lock()
- store[ptr] = v
- mutex.Unlock()
-
- return ptr
-}
-
-func Restore(ptr unsafe.Pointer) (v interface{}) {
- if ptr == nil {
- return nil
- }
-
- mutex.RLock()
- v = store[ptr]
- mutex.RUnlock()
- return
-}
-
-func Unref(ptr unsafe.Pointer) {
- if ptr == nil {
- return
- }
-
- mutex.Lock()
- delete(store, ptr)
- mutex.Unlock()
-
- C.free(ptr)
-}
diff --git a/nsqcli/pool.go b/nsqcli/pool.go
deleted file mode 100644
index 236beed..0000000
--- a/nsqcli/pool.go
+++ /dev/null
@@ -1,25 +0,0 @@
-// Package pool implements a pool of net.Conn interfaces to manage and reuse them.
-package nsqcli
-
-import "errors"
-
-var (
- // ErrClosed is the error resulting if the pool is closed via pool.Close().
- ErrClosed = errors.New("pool is closed")
-)
-
-// Pool interface describes a pool implementation. A pool should have maximum
-// capacity. An ideal pool is threadsafe and easy to use.
-type Pool interface {
- // Get returns a new connection from the pool. Closing the connections puts
- // it back to the Pool. Closing it when the pool is destroyed or full will
- // be counted as an error.
- Get() (*PoolConn, error)
-
- // Close closes the pool and all its connections. After Close() the pool is
- // no longer usable.
- Close()
-
- // Len returns the current number of connections of the pool.
- Len() int
-}
diff --git a/nsqcli/producer.go b/nsqcli/producer.go
deleted file mode 100644
index 488435b..0000000
--- a/nsqcli/producer.go
+++ /dev/null
@@ -1,139 +0,0 @@
-package nsqcli
-
-import (
- "fmt"
- "time"
-
- nsq "github.com/nsqio/go-nsq"
-)
-
-type Producer interface {
- Publish(topic string, body []byte) error
- MultiPublish(topic string, body [][]byte) error
- DeferredPublish(topic string, delay time.Duration, body []byte) error
-}
-
-var _ Producer = (*producer)(nil)
-
-type producer struct {
- pool Pool
-}
-
-var (
- // name pool producer
- nsqList = make(map[string]Pool)
-)
-
-type Config struct {
- Addr string `toml:"addr" json:"addr"`
- InitSize int `toml:"init_size" json:"init_size"`
- MaxSize int `toml:"max_size" json:"max_size"`
-}
-
-func CreateProducerPool(configs map[string]Config) {
- for name, conf := range configs {
- n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize)
- if err == nil {
- nsqList[name] = n
- // 鏀寔ip:port瀵诲潃
- nsqList[conf.Addr] = n
- }
- }
-}
-
-func DestroyProducerPool() {
- for _, p := range nsqList {
- p.Close()
- }
-}
-
-func GetProducer(key ...string) (*producer, error) {
- k := "default"
- if len(key) > 0 {
- k = key[0]
- }
- if n, ok := nsqList[k]; ok {
- return &producer{n}, nil
- }
- return nil, fmt.Errorf("GetProducer can't get producer")
-}
-
-// CreateNSQProducer create nsq producer
-func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) {
- cfg := nsq.NewConfig()
- for _, option := range options {
- option(cfg)
- }
-
- producer, err := nsq.NewProducer(addr, cfg)
- if err != nil {
- return nil, err
- }
- // producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError)
- return producer, nil
-}
-
-// CreateNSQProducerPool create a nwq producer pool
-func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) {
- factory := func() (*nsq.Producer, error) {
- // TODO 杩欓噷搴旇鎵цping鏂规硶鏉ョ‘瀹氳繛鎺ユ槸姝e父鐨勫惁鍒欎笉搴旇鍒涘缓conn
- return newProducer(addr, options...)
- }
- nsqPool, err := NewChannelPool(initSize, maxSize, factory)
- if err != nil {
- return nil, err
- }
- return nsqPool, nil
-}
-
-func NewProducer(addr string) (*producer, error) {
- CreateProducerPool(map[string]Config{"default": {addr, 1, 1}})
- return GetProducer()
-}
-
-func retry(num int, fn func() error) error {
- var err error
- for i := 0; i < num; i++ {
- err = fn()
- if err == nil {
- break
- }
- }
- return err
-}
-
-func (p *producer) Publish(topic string, body []byte) error {
- nsq, err := p.pool.Get()
- if err != nil {
- return err
- }
- defer nsq.Close()
-
- return retry(2, func() error {
- return nsq.Publish(topic, body)
- })
-}
-
-func (p *producer) MultiPublish(topic string, body [][]byte) error {
- nsq, err := p.pool.Get()
- if err != nil {
- return err
- }
- defer nsq.Close()
-
- return retry(2, func() error {
- return nsq.MultiPublish(topic, body)
- })
-}
-
-func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error {
- nsq, err := p.pool.Get()
- if err != nil {
- return err
- }
- defer nsq.Close()
-
- return retry(2, func() error {
- return nsq.DeferredPublish(topic, delay, body)
- })
-}
--
Gitblit v1.8.0