From 1d0c9c4391d8ae40fb981a23170b3dad0d11aad9 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 16 五月 2023 16:52:56 +0800 Subject: [PATCH] for go get --- /dev/null | 139 ---------------------------------------------- 1 files changed, 0 insertions(+), 139 deletions(-) diff --git a/nsqcli/LICENSE b/nsqcli/LICENSE deleted file mode 100644 index 25fdaf6..0000000 --- a/nsqcli/LICENSE +++ /dev/null @@ -1,20 +0,0 @@ -The MIT License (MIT) - -Copyright (c) 2013 Fatih Arslan - -Permission is hereby granted, free of charge, to any person obtaining a copy of -this software and associated documentation files (the "Software"), to deal in -the Software without restriction, including without limitation the rights to -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software is furnished to do so, -subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/nsqcli/README.md b/nsqcli/README.md deleted file mode 100644 index 432d86d..0000000 --- a/nsqcli/README.md +++ /dev/null @@ -1,54 +0,0 @@ -# NSQPool - -NSQPool is a thread safe connection pool for nsq producer. It can be used to -manage and reuse nsq producer connection. - - -## Install and Usage - -Install the package with: - -```bash -github.com/qgymje/nsqpool -``` - -Import it with: - -```go -import ( - "github.com/qgymje/nsqpool" - nsq "github.com/nsqio/go-nsq" -) -``` - -and use `pool` as the package name inside the code. - -## Example - -```go -// create a factory() to be used with channel based pool -factory := func() (*nsq.Producer, error) { - config := nsq.NewConfig() - return nsq.NewProducer(":4150", config) -} - -nsqPool, err := pool.NewChannelPool(5, 30, factory) - -producer, err := nsqPool.Get() - -producer.Publish("topic", "some data") -// do something with producer and put it back to the pool by closing the connection -// (this doesn't close the underlying connection instead it's putting it back -// to the pool). -producer.Close() - -// close pool any time you want, this closes all the connections inside a pool -nsqPool.Close() - -// currently available connections in the pool -current := nsqPool.Len() -``` - -## License - -The MIT License (MIT) - see LICENSE for more details diff --git a/nsqcli/channel.go b/nsqcli/channel.go deleted file mode 100644 index e583fcd..0000000 --- a/nsqcli/channel.go +++ /dev/null @@ -1,134 +0,0 @@ -package nsqcli - -import ( - "errors" - "fmt" - "sync" - - nsq "github.com/nsqio/go-nsq" -) - -// channelPool implements the Pool interface based on buffered channels. -type channelPool struct { - // storage for our net.Conn connections - mu sync.Mutex - conns chan *nsq.Producer - - // net.Conn generator - factory Factory -} - -// Factory is a function to create new connections. -type Factory func() (*nsq.Producer, error) - -// NewChannelPool returns a new pool based on buffered channels with an initial -// capacity and maximum capacity. Factory is used when initial capacity is -// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool -// until a new Get() is called. During a Get(), If there is no new connection -// available in the pool, a new connection will be created via the Factory() -// method. -func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) { - if initialCap < 0 || maxCap <= 0 || initialCap > maxCap { - return nil, errors.New("invalid capacity settings") - } - - c := &channelPool{ - conns: make(chan *nsq.Producer, maxCap), - factory: factory, - } - - // create initial connections, if something goes wrong, - // just close the pool error out. - for i := 0; i < initialCap; i++ { - conn, err := factory() - if err != nil { - c.Close() - return nil, fmt.Errorf("factory is not able to fill the pool: %s", err) - } - c.conns <- conn - } - - return c, nil -} - -func (c *channelPool) getConns() chan *nsq.Producer { - c.mu.Lock() - conns := c.conns - c.mu.Unlock() - return conns -} - -// Get implements the Pool interfaces Get() method. If there is no new -// connection available in the pool, a new connection will be created via the -// Factory() method. -func (c *channelPool) Get() (*PoolConn, error) { - conns := c.getConns() - if conns == nil { - return nil, ErrClosed - } - - // wrap our connections with out custom net.Conn implementation (wrapConn - // method) that puts the connection back to the pool if it's closed. - select { - case conn := <-conns: - if conn == nil { - return nil, ErrClosed - } - - return c.wrapConn(conn), nil - default: - conn, err := c.factory() - if err != nil { - return nil, err - } - - return c.wrapConn(conn), nil - } -} - -// put puts the connection back to the pool. If the pool is full or closed, -// conn is simply closed. A nil conn will be rejected. -func (c *channelPool) put(conn *nsq.Producer) error { - if conn == nil { - return errors.New("connection is nil. rejecting") - } - - c.mu.Lock() - defer c.mu.Unlock() - - if c.conns == nil { - // pool is closed, close passed connection - conn.Stop() - return nil - } - - // put the resource back into the pool. If the pool is full, this will - // block and the default case will be executed. - select { - case c.conns <- conn: - return nil - default: - // pool is full, close passed connection - conn.Stop() - return nil - } -} - -func (c *channelPool) Close() { - c.mu.Lock() - conns := c.conns - c.conns = nil - c.factory = nil - c.mu.Unlock() - - if conns == nil { - return - } - - close(conns) - for conn := range conns { - conn.Stop() - } -} - -func (c *channelPool) Len() int { return len(c.getConns()) } diff --git a/nsqcli/conn.go b/nsqcli/conn.go deleted file mode 100644 index a66a10e..0000000 --- a/nsqcli/conn.go +++ /dev/null @@ -1,45 +0,0 @@ -package nsqcli - -import ( - "sync" - - nsq "github.com/nsqio/go-nsq" -) - -// PoolConn is a wrapper around net.Conn to modify the the behavior of -// net.Conn's Close() method. -type PoolConn struct { - *nsq.Producer - mu sync.RWMutex - c *channelPool - unusable bool -} - -// Close puts the given connects back to the pool instead of closing it. -func (p *PoolConn) Close() error { - p.mu.RLock() - defer p.mu.RUnlock() - - if p.unusable { - if p.Producer != nil { - p.Producer.Stop() - return nil - } - return nil - } - return p.c.put(p.Producer) -} - -// MarkUnusable marks the connection not usable any more, to let the pool close it instead of returning it to pool. -func (p *PoolConn) MarkUnusable() { - p.mu.Lock() - p.unusable = true - p.mu.Unlock() -} - -// newConn wraps a standard net.Conn to a poolConn net.Conn. -func (c *channelPool) wrapConn(conn *nsq.Producer) *PoolConn { - p := &PoolConn{c: c} - p.Producer = conn - return p -} diff --git a/nsqcli/consumer.go b/nsqcli/consumer.go deleted file mode 100644 index 63f17f4..0000000 --- a/nsqcli/consumer.go +++ /dev/null @@ -1,99 +0,0 @@ -package nsqcli - -import ( - "context" - "fmt" - "time" - - nsq "github.com/nsqio/go-nsq" -) - -type NsqConsumer struct { - consumer *nsq.Consumer - // handler nsq.Handler - handler func([]byte) error - ctx context.Context - ctxCancel context.CancelFunc - topic string - channel string -} - -func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) { - conf := nsq.NewConfig() - conf.MaxAttempts = 0 - conf.MsgTimeout = 10 * time.Minute // 榛樿涓�涓秷鎭渶澶氳兘澶勭悊鍗佸垎閽燂紝鍚﹀垯灏变細閲嶆柊涓㈠叆闃熷垪 - conf.LookupdPollInterval = 3 * time.Second // 璋冩暣consumer鐨勯噸杩為棿闅旀椂闂翠负3绉� - for _, option := range options { - option(conf) - } - - consumer, err := nsq.NewConsumer(topic, channel, conf) - if err != nil { - return nil, err - } - return &NsqConsumer{ - consumer: consumer, - ctx: ctx, - topic: topic, - channel: channel, - }, nil -} - -func DestroyNsqConsumer(c *NsqConsumer) { - if c != nil { - if c.ctxCancel != nil { - c.ctxCancel() - } - } -} - -// func (n *NsqConsumer) AddHandler(handler nsq.Handler) { -// n.handler = handler -// } - -func (n *NsqConsumer) AddHandler(handler func([]byte) error) { - n.handler = handler -} - -func (n *NsqConsumer) Run(qaddr string, concurrency int) error { - return n.RunDistributed([]string{qaddr}, nil, concurrency) -} - -func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error { - return n.RunDistributed(nil, []string{lookupAddr}, concurrency) -} - -func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error { - n.consumer.ChangeMaxInFlight(concurrency) - // n.consumer.AddConcurrentHandlers(n.handler, concurrency) - n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error { - return n.handler(msg.Body) - // return nil - }), concurrency) - - var err error - if len(qAddr) > 0 { - err = n.consumer.ConnectToNSQDs(qAddr) - } else if len(lAddr) > 0 { - err = n.consumer.ConnectToNSQLookupds(lAddr) - } else { - err = fmt.Errorf("Addr Must NOT Empty") - } - if err != nil { - return err - } - - if n.ctx == nil { - n.ctx, n.ctxCancel = context.WithCancel(context.Background()) - } - - for { - select { - case <-n.ctx.Done(): - fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel) - n.consumer.Stop() - fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel) - return nil - } - } -} diff --git a/nsqcli/pointer.go b/nsqcli/pointer.go deleted file mode 100644 index 0f0bf38..0000000 --- a/nsqcli/pointer.go +++ /dev/null @@ -1,57 +0,0 @@ -package nsqcli - -// #include <stdlib.h> -import "C" -import ( - "sync" - "unsafe" -) - -var ( - mutex sync.RWMutex - store = map[unsafe.Pointer]interface{}{} -) - -func Save(v interface{}) unsafe.Pointer { - if v == nil { - return nil - } - - // Generate real fake C pointer. - // This pointer will not store any data, but will bi used for indexing purposes. - // Since Go doest allow to cast dangling pointer to unsafe.Pointer, we do rally allocate one byte. - // Why we need indexing, because Go doest allow C code to store pointers to Go data. - var ptr unsafe.Pointer = C.malloc(C.size_t(1)) - if ptr == nil { - panic("can't allocate 'cgo-pointer hack index pointer': ptr == nil") - } - - mutex.Lock() - store[ptr] = v - mutex.Unlock() - - return ptr -} - -func Restore(ptr unsafe.Pointer) (v interface{}) { - if ptr == nil { - return nil - } - - mutex.RLock() - v = store[ptr] - mutex.RUnlock() - return -} - -func Unref(ptr unsafe.Pointer) { - if ptr == nil { - return - } - - mutex.Lock() - delete(store, ptr) - mutex.Unlock() - - C.free(ptr) -} diff --git a/nsqcli/pool.go b/nsqcli/pool.go deleted file mode 100644 index 236beed..0000000 --- a/nsqcli/pool.go +++ /dev/null @@ -1,25 +0,0 @@ -// Package pool implements a pool of net.Conn interfaces to manage and reuse them. -package nsqcli - -import "errors" - -var ( - // ErrClosed is the error resulting if the pool is closed via pool.Close(). - ErrClosed = errors.New("pool is closed") -) - -// Pool interface describes a pool implementation. A pool should have maximum -// capacity. An ideal pool is threadsafe and easy to use. -type Pool interface { - // Get returns a new connection from the pool. Closing the connections puts - // it back to the Pool. Closing it when the pool is destroyed or full will - // be counted as an error. - Get() (*PoolConn, error) - - // Close closes the pool and all its connections. After Close() the pool is - // no longer usable. - Close() - - // Len returns the current number of connections of the pool. - Len() int -} diff --git a/nsqcli/producer.go b/nsqcli/producer.go deleted file mode 100644 index 488435b..0000000 --- a/nsqcli/producer.go +++ /dev/null @@ -1,139 +0,0 @@ -package nsqcli - -import ( - "fmt" - "time" - - nsq "github.com/nsqio/go-nsq" -) - -type Producer interface { - Publish(topic string, body []byte) error - MultiPublish(topic string, body [][]byte) error - DeferredPublish(topic string, delay time.Duration, body []byte) error -} - -var _ Producer = (*producer)(nil) - -type producer struct { - pool Pool -} - -var ( - // name pool producer - nsqList = make(map[string]Pool) -) - -type Config struct { - Addr string `toml:"addr" json:"addr"` - InitSize int `toml:"init_size" json:"init_size"` - MaxSize int `toml:"max_size" json:"max_size"` -} - -func CreateProducerPool(configs map[string]Config) { - for name, conf := range configs { - n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize) - if err == nil { - nsqList[name] = n - // 鏀寔ip:port瀵诲潃 - nsqList[conf.Addr] = n - } - } -} - -func DestroyProducerPool() { - for _, p := range nsqList { - p.Close() - } -} - -func GetProducer(key ...string) (*producer, error) { - k := "default" - if len(key) > 0 { - k = key[0] - } - if n, ok := nsqList[k]; ok { - return &producer{n}, nil - } - return nil, fmt.Errorf("GetProducer can't get producer") -} - -// CreateNSQProducer create nsq producer -func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) { - cfg := nsq.NewConfig() - for _, option := range options { - option(cfg) - } - - producer, err := nsq.NewProducer(addr, cfg) - if err != nil { - return nil, err - } - // producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError) - return producer, nil -} - -// CreateNSQProducerPool create a nwq producer pool -func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) { - factory := func() (*nsq.Producer, error) { - // TODO 杩欓噷搴旇鎵цping鏂规硶鏉ョ‘瀹氳繛鎺ユ槸姝e父鐨勫惁鍒欎笉搴旇鍒涘缓conn - return newProducer(addr, options...) - } - nsqPool, err := NewChannelPool(initSize, maxSize, factory) - if err != nil { - return nil, err - } - return nsqPool, nil -} - -func NewProducer(addr string) (*producer, error) { - CreateProducerPool(map[string]Config{"default": {addr, 1, 1}}) - return GetProducer() -} - -func retry(num int, fn func() error) error { - var err error - for i := 0; i < num; i++ { - err = fn() - if err == nil { - break - } - } - return err -} - -func (p *producer) Publish(topic string, body []byte) error { - nsq, err := p.pool.Get() - if err != nil { - return err - } - defer nsq.Close() - - return retry(2, func() error { - return nsq.Publish(topic, body) - }) -} - -func (p *producer) MultiPublish(topic string, body [][]byte) error { - nsq, err := p.pool.Get() - if err != nil { - return err - } - defer nsq.Close() - - return retry(2, func() error { - return nsq.MultiPublish(topic, body) - }) -} - -func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error { - nsq, err := p.pool.Get() - if err != nil { - return err - } - defer nsq.Close() - - return retry(2, func() error { - return nsq.DeferredPublish(topic, delay, body) - }) -} -- Gitblit v1.8.0