From ecb63331999a88c4980997aa8473ffed0dd31a3c Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 16 五月 2023 15:42:56 +0800
Subject: [PATCH] nsqclient producer consumer

---
 nsqclient/channel_test.go |  218 ++++++++++
 .gitignore                |    3 
 go.mod                    |    5 
 make.sh                   |   10 
 nsqclient/LICENSE         |   20 
 nsqclient/conn.go         |   45 ++
 TST/ctest/ctest.cpp       |   95 ++++
 nsqclient/channel.go      |  134 ++++++
 nsqclient/consumer.go     |   90 ++++
 go.sum                    |    4 
 nsqclient/pointer.go      |   57 ++
 nsqclient/producer.go     |  139 ++++++
 nsqclient/README.md       |   54 ++
 nsqclient/pool.go         |   25 +
 TST/test/test.go          |   76 +++
 main.go                   |  144 ++++++
 clib/libnsqclient.h       |  101 ++++
 17 files changed, 1,220 insertions(+), 0 deletions(-)

diff --git a/.gitignore b/.gitignore
index 8365624..c41f8dc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,3 +21,6 @@
 
 *.exe
 *.test
+
+nsqCli
+cnsqcli
diff --git a/TST/ctest/ctest.cpp b/TST/ctest/ctest.cpp
new file mode 100644
index 0000000..69316ee
--- /dev/null
+++ b/TST/ctest/ctest.cpp
@@ -0,0 +1,95 @@
+#include <stdio.h>
+#include <string.h>
+#include "clib/libnsqclient.h"
+
+#include <string>
+#include <thread>
+#include <mutex>
+
+using namespace std;
+
+static void produce(int two){
+    char ip[] = "192.168.20.108:4150";
+    GoString addr = {ip, (ptrdiff_t)strlen(ip)};
+    void* p = createProducer(addr);
+
+    string msg("cnsqclient dynamic library");
+    while(msg.size() < 32){
+        msg += msg;
+    }
+    // printf("msg %s\n", msg.c_str());
+
+    for(int i = 0; i < 1000000; i++){
+        GoString topic = {"test", 4};
+        string amsg = msg + "-x";
+        GoSlice data{(void*)amsg.data(), (GoInt)amsg.size(), (GoInt)amsg.size()};
+        if (!publish(p, topic, data)){
+            printf("publish msg failed topic %s\n", topic.p);
+            exit(0);
+        }
+
+        if (two){
+            topic.p = "test2";
+            topic.n = 5;
+            amsg = msg + "-y";
+            data.data = (void*)amsg.data();
+            if (!publish(p, topic, data)){
+                printf("publish msg failed topic %s\n", topic.p);
+                exit(0);
+            }
+        }
+
+    }
+    destroyProducer(p);
+}
+
+
+static void consume(const char* topic, const char* channel){
+    GoString t = {topic, (ptrdiff_t)strlen(topic)};
+    GoString c = {channel, (ptrdiff_t)strlen(channel)};
+
+    void* con = createConsumer(t, c);
+
+    char ip[] = "192.168.20.108:4150";
+    GoString addr = {ip, (ptrdiff_t)strlen(ip)};
+
+    // thread
+    thread([&con,&addr]{
+        Run(con, addr);
+    }).detach();
+
+    auto start = chrono::steady_clock::now();
+    int count = 0;
+    while (true) {
+        void* msg = NULL;
+        size_t size = 0;
+        GoUint8 ok = getMessage(con, &msg, &size);
+        if (!ok){
+            this_thread::sleep_for(chrono::milliseconds(100));
+            continue;
+        }
+        count++;
+        printf("======>> recv msg %s size %d\n", (char*)msg, count);
+        relMessage(msg);
+        if (count > 999000){
+            printf("======>> use time %d\n",
+                chrono::duration_cast<chrono::seconds>(chrono::steady_clock::now()-start).count());
+        }
+    }
+    printf("======>> recv all msg size %d\n", count);
+}
+
+int main(int argc, char const *argv[])
+{
+    thread([]{
+        produce(false);
+    }).detach();
+
+    // thread([]{
+    //     consume("test2", "sensor01");
+    // }).detach();
+
+    consume("test", "sensor01");
+
+    return 0;
+}
\ No newline at end of file
diff --git a/TST/test/test.go b/TST/test/test.go
new file mode 100644
index 0000000..62d1fae
--- /dev/null
+++ b/TST/test/test.go
@@ -0,0 +1,76 @@
+package test
+
+import (
+	"context"
+	"fmt"
+	"log"
+	"nsqCli/nsqclient"
+	"time"
+
+	"github.com/nsqio/go-nsq"
+)
+
+func produce(two bool) {
+	p, _ := nsqclient.NewProducer("192.168.20.108:4150")
+
+	var str string
+	for len(str) < 32 {
+		str += "cnsqclient dynamic library"
+	}
+	msgx := []byte(str + "--x")
+	msgy := []byte(str + "--y")
+
+	// count := 0
+	for i := 0; i < 1000000; i++ {
+		// if e := p.Publish("test", []byte("x")); e != nil {
+		if e := p.Publish("test", msgx); e != nil {
+			log.Fatal("Publish error:" + e.Error())
+		}
+
+		if two {
+			// if e := p.Publish("test", []byte("y")); e != nil {
+			if e := p.Publish("test2", msgy); e != nil {
+				log.Fatal("Publish error:" + e.Error())
+			}
+		}
+
+		// log.Println("send time ", count)
+		// count++
+	}
+}
+
+func consume(topic, channel string) {
+	ctx, cancel := context.WithCancel(context.Background())
+
+	if c, e := nsqclient.NewNsqConsumer(ctx, topic, channel); e != nil {
+		fmt.Println("NewNsqConsumer failed", e)
+		return
+	} else {
+		ch := make(chan struct{})
+
+		count := 0
+		c.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
+			count++
+			fmt.Println("recv msg ", string(msg.Body), " size", count)
+			if count > 999000 {
+				ch <- struct{}{}
+			}
+			return nil
+		}))
+		// go c.Run("192.168.20.108:4150", 2)
+		go c.RunLookupd("192.168.20.108:4161", 2)
+
+		t := time.Now()
+		<-ch
+		// fmt.Println("======>> use time ", time.Since(t))
+		fmt.Println("======>> use time ", time.Now().Unix()-t.Unix())
+		cancel()
+	}
+}
+
+func Test() {
+	go produce(false)
+
+	// go consume("test2", "sensor01")
+	consume("test", "sensor01")
+}
diff --git a/clib/libnsqclient.h b/clib/libnsqclient.h
new file mode 100644
index 0000000..fab0277
--- /dev/null
+++ b/clib/libnsqclient.h
@@ -0,0 +1,101 @@
+/* Code generated by cmd/cgo; DO NOT EDIT. */
+
+/* package nsqCli */
+
+
+#line 1 "cgo-builtin-export-prolog"
+
+#include <stddef.h> /* for ptrdiff_t below */
+
+#ifndef GO_CGO_EXPORT_PROLOGUE_H
+#define GO_CGO_EXPORT_PROLOGUE_H
+
+#ifndef GO_CGO_GOSTRING_TYPEDEF
+typedef struct { const char *p; ptrdiff_t n; } _GoString_;
+#endif
+
+#endif
+
+/* Start of preamble from import "C" comments.  */
+
+
+#line 3 "main.go"
+ #include <stdlib.h>
+ #include <string.h>
+
+#line 1 "cgo-generated-wrapper"
+
+
+/* End of preamble from import "C" comments.  */
+
+
+/* Start of boilerplate cgo prologue.  */
+#line 1 "cgo-gcc-export-header-prolog"
+
+#ifndef GO_CGO_PROLOGUE_H
+#define GO_CGO_PROLOGUE_H
+
+typedef signed char GoInt8;
+typedef unsigned char GoUint8;
+typedef short GoInt16;
+typedef unsigned short GoUint16;
+typedef int GoInt32;
+typedef unsigned int GoUint32;
+typedef long long GoInt64;
+typedef unsigned long long GoUint64;
+typedef GoInt64 GoInt;
+typedef GoUint64 GoUint;
+typedef __SIZE_TYPE__ GoUintptr;
+typedef float GoFloat32;
+typedef double GoFloat64;
+typedef float _Complex GoComplex64;
+typedef double _Complex GoComplex128;
+
+/*
+  static assertion to make sure the file is being used on architecture
+  at least with matching size of GoInt.
+*/
+typedef char _check_for_64_bit_pointer_matching_GoInt[sizeof(void*)==64/8 ? 1:-1];
+
+#ifndef GO_CGO_GOSTRING_TYPEDEF
+typedef _GoString_ GoString;
+#endif
+typedef void *GoMap;
+typedef void *GoChan;
+typedef struct { void *t; void *v; } GoInterface;
+typedef struct { void *data; GoInt len; GoInt cap; } GoSlice;
+
+#endif
+
+/* End of boilerplate cgo prologue.  */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+
+extern void* createProducer(GoString p0);
+
+extern void destroyProducer(void* p0);
+
+extern GoUint8 publish(void* p0, GoString p1, GoSlice p2);
+
+extern GoUint8 multiPublish(void* p0, GoString p1, GoSlice p2);
+
+extern GoUint8 deferredPublish(void* p0, GoString p1, GoInt p2, GoSlice p3);
+
+extern void* createConsumer(GoString p0, GoString p1);
+
+extern void destroyConsumer(void* p0);
+
+extern void Run(void* p0, GoString p1);
+
+extern void RunLookupd(void* p0, GoString p1);
+
+extern GoUint8 getMessage(void* p0, void** p1, size_t* p2);
+
+extern void relMessage(void* p0);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..5a3990a
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,5 @@
+module nsqCli
+
+go 1.14
+
+require github.com/nsqio/go-nsq v1.1.0
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..457d1d9
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,4 @@
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
+github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
diff --git a/main.go b/main.go
new file mode 100644
index 0000000..ededf6a
--- /dev/null
+++ b/main.go
@@ -0,0 +1,144 @@
+package main
+
+// #include <stdlib.h>
+// #include <string.h>
+import "C"
+
+import (
+	"nsqCli/TST/test"
+	"nsqCli/nsqclient"
+	"sync"
+	"time"
+	"unsafe"
+
+	"github.com/nsqio/go-nsq"
+)
+
+//export createProducer
+func createProducer(addr string) unsafe.Pointer {
+	n, _ := nsqclient.NewProducer(addr)
+	return nsqclient.Save(n)
+}
+
+//export destroyProducer
+func destroyProducer(ph unsafe.Pointer) {
+	nsqclient.Unref(ph)
+	nsqclient.DestroyProducerPool()
+}
+
+func pcvt(ph unsafe.Pointer) nsqclient.Producer {
+	return nsqclient.Restore(ph).(nsqclient.Producer)
+}
+
+//export publish
+func publish(ph unsafe.Pointer, topic string, msg []byte) bool {
+	p := pcvt(ph)
+	if err := p.Publish(topic, msg); err != nil {
+		return false
+	}
+	return true
+}
+
+//export multiPublish
+func multiPublish(ph unsafe.Pointer, topic string, msg [][]byte) bool {
+	p := pcvt(ph)
+	if err := p.MultiPublish(topic, msg); err != nil {
+		return false
+	}
+	return true
+}
+
+//export deferredPublish
+func deferredPublish(ph unsafe.Pointer, topic string, ms int, msg []byte) bool {
+	p := pcvt(ph)
+	if err := p.DeferredPublish(topic, time.Duration(ms)*time.Millisecond, msg); err != nil {
+		return false
+	}
+	return true
+}
+
+/////////////////////////////////////////////////////////////
+
+type consumer struct {
+	nsqcon *nsqclient.NsqConsumer
+	lck    sync.Mutex
+	msgs   []*nsq.Message
+}
+
+//export createConsumer
+func createConsumer(topic, channel string) unsafe.Pointer {
+	if c, err := nsqclient.NewNsqConsumer(nil, topic, channel); err == nil {
+		con := &consumer{
+			nsqcon: c,
+		}
+		return nsqclient.Save(con)
+	}
+	return nil
+}
+
+func ccvt(ch unsafe.Pointer) *consumer {
+	return nsqclient.Restore(ch).(*consumer)
+}
+
+//export destroyConsumer
+func destroyConsumer(ch unsafe.Pointer) {
+	nsqclient.DestroyNsqConsumer(ccvt(ch).nsqcon)
+	nsqclient.Unref(ch)
+}
+
+//export Run
+func Run(ch unsafe.Pointer, addr string) {
+	c := ccvt(ch)
+	c.nsqcon.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
+		c.lck.Lock()
+		defer c.lck.Unlock()
+		c.msgs = append(c.msgs, msg)
+		return nil
+	}))
+
+	c.nsqcon.Run(addr, 1)
+}
+
+//export RunLookupd
+func RunLookupd(ch unsafe.Pointer, lookAddr string) {
+	c := ccvt(ch)
+	c.nsqcon.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
+		c.lck.Lock()
+		defer c.lck.Unlock()
+		c.msgs = append(c.msgs, msg)
+		return nil
+	}))
+
+	c.nsqcon.RunLookupd(lookAddr, 1)
+}
+
+//export getMessage
+func getMessage(ch unsafe.Pointer, data *unsafe.Pointer, size *C.size_t) bool {
+	c := ccvt(ch)
+	c.lck.Lock()
+	defer c.lck.Unlock()
+	if len(c.msgs) == 0 {
+		return false
+	}
+
+	msg := c.msgs[0]
+	c.msgs = c.msgs[1:]
+
+	*size = C.size_t(len(msg.Body))
+	ptr := C.malloc(*size)
+	C.memcpy(ptr, unsafe.Pointer(&msg.Body[0]), *size)
+	*data = ptr
+
+	return true
+}
+
+//export relMessage
+func relMessage(msg unsafe.Pointer) {
+	if msg != nil {
+		C.free(msg)
+	}
+}
+
+func main() {
+	test.Test()
+}
diff --git a/make.sh b/make.sh
new file mode 100755
index 0000000..51d276d
--- /dev/null
+++ b/make.sh
@@ -0,0 +1,10 @@
+#!/bin/bash
+
+if [ ! -d "clib" ]; then
+    mkdir clib
+fi
+go tool cgo -exportheader clib/libnsqclient.h main.go &&
+go build -buildmode=c-shared -o ./clib/libnsqclient.so &&
+rm -fr _obj &&
+go build &&
+g++ -std=c++11 -g -O0 -o cnsqcli TST/ctest/ctest.cpp -I. -Lclib/ -lnsqclient -ldl -pthread
diff --git a/nsqclient/LICENSE b/nsqclient/LICENSE
new file mode 100644
index 0000000..25fdaf6
--- /dev/null
+++ b/nsqclient/LICENSE
@@ -0,0 +1,20 @@
+The MIT License (MIT)
+
+Copyright (c) 2013 Fatih Arslan
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/nsqclient/README.md b/nsqclient/README.md
new file mode 100644
index 0000000..432d86d
--- /dev/null
+++ b/nsqclient/README.md
@@ -0,0 +1,54 @@
+# NSQPool
+
+NSQPool is a thread safe connection pool for nsq producer. It can be used to
+manage and reuse nsq producer connection.
+
+
+## Install and Usage
+
+Install the package with:
+
+```bash
+github.com/qgymje/nsqpool
+```
+
+Import it with:
+
+```go
+import (
+    "github.com/qgymje/nsqpool"
+    nsq "github.com/nsqio/go-nsq"
+)
+```
+
+and use `pool` as the package name inside the code.
+
+## Example
+
+```go
+// create a factory() to be used with channel based pool
+factory := func() (*nsq.Producer, error) { 
+    config := nsq.NewConfig()
+    return nsq.NewProducer(":4150", config)
+}
+
+nsqPool, err := pool.NewChannelPool(5, 30, factory)
+
+producer, err := nsqPool.Get()
+
+producer.Publish("topic", "some data")
+// do something with producer and put it back to the pool by closing the connection
+// (this doesn't close the underlying connection instead it's putting it back
+// to the pool).
+producer.Close()
+
+// close pool any time you want, this closes all the connections inside a pool
+nsqPool.Close()
+
+// currently available connections in the pool
+current := nsqPool.Len()
+```
+
+## License
+
+The MIT License (MIT) - see LICENSE for more details
diff --git a/nsqclient/channel.go b/nsqclient/channel.go
new file mode 100644
index 0000000..1594dcc
--- /dev/null
+++ b/nsqclient/channel.go
@@ -0,0 +1,134 @@
+package nsqclient
+
+import (
+	"errors"
+	"fmt"
+	"sync"
+
+	nsq "github.com/nsqio/go-nsq"
+)
+
+// channelPool implements the Pool interface based on buffered channels.
+type channelPool struct {
+	// storage for our net.Conn connections
+	mu    sync.Mutex
+	conns chan *nsq.Producer
+
+	// net.Conn generator
+	factory Factory
+}
+
+// Factory is a function to create new connections.
+type Factory func() (*nsq.Producer, error)
+
+// NewChannelPool returns a new pool based on buffered channels with an initial
+// capacity and maximum capacity. Factory is used when initial capacity is
+// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool
+// until a new Get() is called. During a Get(), If there is no new connection
+// available in the pool, a new connection will be created via the Factory()
+// method.
+func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) {
+	if initialCap < 0 || maxCap <= 0 || initialCap > maxCap {
+		return nil, errors.New("invalid capacity settings")
+	}
+
+	c := &channelPool{
+		conns:   make(chan *nsq.Producer, maxCap),
+		factory: factory,
+	}
+
+	// create initial connections, if something goes wrong,
+	// just close the pool error out.
+	for i := 0; i < initialCap; i++ {
+		conn, err := factory()
+		if err != nil {
+			c.Close()
+			return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
+		}
+		c.conns <- conn
+	}
+
+	return c, nil
+}
+
+func (c *channelPool) getConns() chan *nsq.Producer {
+	c.mu.Lock()
+	conns := c.conns
+	c.mu.Unlock()
+	return conns
+}
+
+// Get implements the Pool interfaces Get() method. If there is no new
+// connection available in the pool, a new connection will be created via the
+// Factory() method.
+func (c *channelPool) Get() (*PoolConn, error) {
+	conns := c.getConns()
+	if conns == nil {
+		return nil, ErrClosed
+	}
+
+	// wrap our connections with out custom net.Conn implementation (wrapConn
+	// method) that puts the connection back to the pool if it's closed.
+	select {
+	case conn := <-conns:
+		if conn == nil {
+			return nil, ErrClosed
+		}
+
+		return c.wrapConn(conn), nil
+	default:
+		conn, err := c.factory()
+		if err != nil {
+			return nil, err
+		}
+
+		return c.wrapConn(conn), nil
+	}
+}
+
+// put puts the connection back to the pool. If the pool is full or closed,
+// conn is simply closed. A nil conn will be rejected.
+func (c *channelPool) put(conn *nsq.Producer) error {
+	if conn == nil {
+		return errors.New("connection is nil. rejecting")
+	}
+
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if c.conns == nil {
+		// pool is closed, close passed connection
+		conn.Stop()
+		return nil
+	}
+
+	// put the resource back into the pool. If the pool is full, this will
+	// block and the default case will be executed.
+	select {
+	case c.conns <- conn:
+		return nil
+	default:
+		// pool is full, close passed connection
+		conn.Stop()
+		return nil
+	}
+}
+
+func (c *channelPool) Close() {
+	c.mu.Lock()
+	conns := c.conns
+	c.conns = nil
+	c.factory = nil
+	c.mu.Unlock()
+
+	if conns == nil {
+		return
+	}
+
+	close(conns)
+	for conn := range conns {
+		conn.Stop()
+	}
+}
+
+func (c *channelPool) Len() int { return len(c.getConns()) }
diff --git a/nsqclient/channel_test.go b/nsqclient/channel_test.go
new file mode 100644
index 0000000..725db9b
--- /dev/null
+++ b/nsqclient/channel_test.go
@@ -0,0 +1,218 @@
+package nsqclient
+
+import (
+	"math/rand"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/nsqio/go-nsq"
+)
+
+var (
+	InitialCap = 2
+	MaximumCap = 10
+	factory    = func() (*nsq.Producer, error) {
+		config := nsq.NewConfig()
+		return nsq.NewProducer(":4160", config)
+	}
+)
+
+func newChannelPool() (Pool, error) {
+	return NewChannelPool(InitialCap, MaximumCap, factory)
+}
+
+func TestNew(t *testing.T) {
+	_, err := newChannelPool()
+	if err != nil {
+		t.Errorf("New error: %s", err)
+	}
+}
+
+func TestPool_Get(t *testing.T) {
+	p, _ := newChannelPool()
+	defer p.Close()
+
+	_, err := p.Get()
+	if err != nil {
+		t.Errorf("Get error: %s", err)
+	}
+
+	// after one get, current capacity should be lowered by one.
+	if p.Len() != (InitialCap - 1) {
+		t.Errorf("Get error. Expecting %d, got %d",
+			(InitialCap - 1), p.Len())
+	}
+
+	// get them all
+	var wg sync.WaitGroup
+	for i := 0; i < (InitialCap - 1); i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			_, err := p.Get()
+			if err != nil {
+				t.Errorf("Get error: %s", err)
+			}
+		}()
+	}
+	wg.Wait()
+
+	if p.Len() != 0 {
+		t.Errorf("Get error. Expecting %d, got %d",
+			(InitialCap - 1), p.Len())
+	}
+
+	_, err = p.Get()
+	if err != nil {
+		t.Errorf("Get error: %s", err)
+	}
+}
+
+func TestPool_Put(t *testing.T) {
+	p, err := NewChannelPool(0, 30, factory)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer p.Close()
+
+	// get/create from the pool
+	conns := make([]*PoolConn, MaximumCap)
+	for i := 0; i < MaximumCap; i++ {
+		conn, _ := p.Get()
+		conns[i] = conn
+	}
+
+	// now put them all back
+	for _, conn := range conns {
+		conn.Close()
+	}
+
+	if p.Len() != MaximumCap {
+		t.Errorf("Put error len. Expecting %d, got %d",
+			1, p.Len())
+	}
+
+	conn, _ := p.Get()
+	p.Close() // close pool
+
+	conn.Close() // try to put into a full pool
+	if p.Len() != 0 {
+		t.Errorf("Put error. Closed pool shouldn't allow to put connections.")
+	}
+}
+
+func TestPool_PutUnusableConn(t *testing.T) {
+	p, _ := newChannelPool()
+	defer p.Close()
+
+	// ensure pool is not empty
+	conn, _ := p.Get()
+	conn.Close()
+
+	poolSize := p.Len()
+	conn, _ = p.Get()
+	conn.Close()
+	if p.Len() != poolSize {
+		t.Errorf("Pool size is expected to be equal to initial size")
+	}
+
+	conn, _ = p.Get()
+	conn.MarkUnusable()
+
+	conn.Close()
+	if p.Len() != poolSize-1 {
+		t.Errorf("Pool size is expected to be initial_size - 1 [%d:%d]", p.Len(), poolSize-1)
+	}
+}
+
+func TestPool_UsedCapacity(t *testing.T) {
+	p, _ := newChannelPool()
+	defer p.Close()
+
+	if p.Len() != InitialCap {
+		t.Errorf("InitialCap error. Expecting %d, got %d",
+			InitialCap, p.Len())
+	}
+}
+
+func TestPool_Close(t *testing.T) {
+	p, _ := newChannelPool()
+
+	// now close it and test all cases we are expecting.
+	p.Close()
+
+	c := p.(*channelPool)
+
+	if c.conns != nil {
+		t.Errorf("Close error, conns channel should be nil")
+	}
+
+	if c.factory != nil {
+		t.Errorf("Close error, factory should be nil")
+	}
+
+	_, err := p.Get()
+	if err == nil {
+		t.Errorf("Close error, get conn should return an error")
+	}
+
+	if p.Len() != 0 {
+		t.Errorf("Close error used capacity. Expecting 0, got %d", p.Len())
+	}
+}
+
+func TestPoolConcurrent(t *testing.T) {
+	p, _ := newChannelPool()
+	pipe := make(chan *PoolConn, 0)
+
+	go func() {
+		p.Close()
+	}()
+
+	for i := 0; i < MaximumCap; i++ {
+		go func() {
+			conn, _ := p.Get()
+
+			pipe <- conn
+		}()
+
+		go func() {
+			conn := <-pipe
+			if conn == nil {
+				return
+			}
+			conn.Close()
+		}()
+	}
+}
+
+func TestPoolConcurrent2(t *testing.T) {
+	p, _ := NewChannelPool(0, 30, factory)
+
+	var wg sync.WaitGroup
+
+	go func() {
+		for i := 0; i < 10; i++ {
+			wg.Add(1)
+			go func(i int) {
+				conn, _ := p.Get()
+				time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
+				conn.Close()
+				wg.Done()
+			}(i)
+		}
+	}()
+
+	for i := 0; i < 10; i++ {
+		wg.Add(1)
+		go func(i int) {
+			conn, _ := p.Get()
+			time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
+			conn.Close()
+			wg.Done()
+		}(i)
+	}
+
+	wg.Wait()
+}
diff --git a/nsqclient/conn.go b/nsqclient/conn.go
new file mode 100644
index 0000000..5c680b5
--- /dev/null
+++ b/nsqclient/conn.go
@@ -0,0 +1,45 @@
+package nsqclient
+
+import (
+	"sync"
+
+	nsq "github.com/nsqio/go-nsq"
+)
+
+// PoolConn is a wrapper around net.Conn to modify the the behavior of
+// net.Conn's Close() method.
+type PoolConn struct {
+	*nsq.Producer
+	mu       sync.RWMutex
+	c        *channelPool
+	unusable bool
+}
+
+// Close puts the given connects back to the pool instead of closing it.
+func (p *PoolConn) Close() error {
+	p.mu.RLock()
+	defer p.mu.RUnlock()
+
+	if p.unusable {
+		if p.Producer != nil {
+			p.Producer.Stop()
+			return nil
+		}
+		return nil
+	}
+	return p.c.put(p.Producer)
+}
+
+// MarkUnusable marks the connection not usable any more, to let the pool close it instead of returning it to pool.
+func (p *PoolConn) MarkUnusable() {
+	p.mu.Lock()
+	p.unusable = true
+	p.mu.Unlock()
+}
+
+// newConn wraps a standard net.Conn to a poolConn net.Conn.
+func (c *channelPool) wrapConn(conn *nsq.Producer) *PoolConn {
+	p := &PoolConn{c: c}
+	p.Producer = conn
+	return p
+}
diff --git a/nsqclient/consumer.go b/nsqclient/consumer.go
new file mode 100644
index 0000000..152989d
--- /dev/null
+++ b/nsqclient/consumer.go
@@ -0,0 +1,90 @@
+package nsqclient
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	nsq "github.com/nsqio/go-nsq"
+)
+
+type NsqConsumer struct {
+	consumer  *nsq.Consumer
+	handler   nsq.Handler
+	ctx       context.Context
+	ctxCancel context.CancelFunc
+	topic     string
+	channel   string
+}
+
+func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
+	conf := nsq.NewConfig()
+	conf.MaxAttempts = 0
+	conf.MsgTimeout = 10 * time.Minute         // 榛樿涓�涓秷鎭渶澶氳兘澶勭悊鍗佸垎閽燂紝鍚﹀垯灏变細閲嶆柊涓㈠叆闃熷垪
+	conf.LookupdPollInterval = 3 * time.Second // 璋冩暣consumer鐨勯噸杩為棿闅旀椂闂翠负3绉�
+	for _, option := range options {
+		option(conf)
+	}
+
+	consumer, err := nsq.NewConsumer(topic, channel, conf)
+	if err != nil {
+		return nil, err
+	}
+	return &NsqConsumer{
+		consumer: consumer,
+		ctx:      ctx,
+		topic:    topic,
+		channel:  channel,
+	}, nil
+}
+
+func DestroyNsqConsumer(c *NsqConsumer) {
+	if c != nil {
+		if c.ctxCancel != nil {
+			c.ctxCancel()
+		}
+	}
+}
+
+func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
+	n.handler = handler
+}
+
+func (n *NsqConsumer) Run(qaddr string, concurrency int) error {
+	return n.RunDistributed([]string{qaddr}, nil, concurrency)
+}
+
+func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error {
+	return n.RunDistributed(nil, []string{lookupAddr}, concurrency)
+}
+
+func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error {
+	n.consumer.ChangeMaxInFlight(concurrency)
+	n.consumer.AddConcurrentHandlers(n.handler, concurrency)
+
+	var err error
+	if len(qAddr) > 0 {
+		err = n.consumer.ConnectToNSQDs(qAddr)
+	} else if len(lAddr) > 0 {
+		err = n.consumer.ConnectToNSQLookupds(lAddr)
+	} else {
+		err = fmt.Errorf("Addr Must NOT Empty")
+	}
+	if err != nil {
+		return err
+	}
+
+	if n.ctx == nil {
+		n.ctx, n.ctxCancel = context.WithCancel(context.Background())
+	}
+
+	for {
+		select {
+		case <-n.ctx.Done():
+			fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel)
+			n.consumer.Stop()
+			fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel)
+			return nil
+		}
+	}
+}
diff --git a/nsqclient/pointer.go b/nsqclient/pointer.go
new file mode 100644
index 0000000..1cba795
--- /dev/null
+++ b/nsqclient/pointer.go
@@ -0,0 +1,57 @@
+package nsqclient
+
+// #include <stdlib.h>
+import "C"
+import (
+	"sync"
+	"unsafe"
+)
+
+var (
+	mutex sync.RWMutex
+	store = map[unsafe.Pointer]interface{}{}
+)
+
+func Save(v interface{}) unsafe.Pointer {
+	if v == nil {
+		return nil
+	}
+
+	// Generate real fake C pointer.
+	// This pointer will not store any data, but will bi used for indexing purposes.
+	// Since Go doest allow to cast dangling pointer to unsafe.Pointer, we do rally allocate one byte.
+	// Why we need indexing, because Go doest allow C code to store pointers to Go data.
+	var ptr unsafe.Pointer = C.malloc(C.size_t(1))
+	if ptr == nil {
+		panic("can't allocate 'cgo-pointer hack index pointer': ptr == nil")
+	}
+
+	mutex.Lock()
+	store[ptr] = v
+	mutex.Unlock()
+
+	return ptr
+}
+
+func Restore(ptr unsafe.Pointer) (v interface{}) {
+	if ptr == nil {
+		return nil
+	}
+
+	mutex.RLock()
+	v = store[ptr]
+	mutex.RUnlock()
+	return
+}
+
+func Unref(ptr unsafe.Pointer) {
+	if ptr == nil {
+		return
+	}
+
+	mutex.Lock()
+	delete(store, ptr)
+	mutex.Unlock()
+
+	C.free(ptr)
+}
diff --git a/nsqclient/pool.go b/nsqclient/pool.go
new file mode 100644
index 0000000..b773490
--- /dev/null
+++ b/nsqclient/pool.go
@@ -0,0 +1,25 @@
+// Package pool implements a pool of net.Conn interfaces to manage and reuse them.
+package nsqclient
+
+import "errors"
+
+var (
+	// ErrClosed is the error resulting if the pool is closed via pool.Close().
+	ErrClosed = errors.New("pool is closed")
+)
+
+// Pool interface describes a pool implementation. A pool should have maximum
+// capacity. An ideal pool is threadsafe and easy to use.
+type Pool interface {
+	// Get returns a new connection from the pool. Closing the connections puts
+	// it back to the Pool. Closing it when the pool is destroyed or full will
+	// be counted as an error.
+	Get() (*PoolConn, error)
+
+	// Close closes the pool and all its connections. After Close() the pool is
+	// no longer usable.
+	Close()
+
+	// Len returns the current number of connections of the pool.
+	Len() int
+}
diff --git a/nsqclient/producer.go b/nsqclient/producer.go
new file mode 100644
index 0000000..717c7a1
--- /dev/null
+++ b/nsqclient/producer.go
@@ -0,0 +1,139 @@
+package nsqclient
+
+import (
+	"fmt"
+	"time"
+
+	nsq "github.com/nsqio/go-nsq"
+)
+
+type Producer interface {
+	Publish(topic string, body []byte) error
+	MultiPublish(topic string, body [][]byte) error
+	DeferredPublish(topic string, delay time.Duration, body []byte) error
+}
+
+var _ Producer = (*producer)(nil)
+
+type producer struct {
+	pool Pool
+}
+
+var (
+	// 				   name   pool producer
+	nsqList = make(map[string]Pool)
+)
+
+type Config struct {
+	Addr     string `toml:"addr" json:"addr"`
+	InitSize int    `toml:"init_size" json:"init_size"`
+	MaxSize  int    `toml:"max_size" json:"max_size"`
+}
+
+func CreateProducerPool(configs map[string]Config) {
+	for name, conf := range configs {
+		n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize)
+		if err == nil {
+			nsqList[name] = n
+			// 鏀寔ip:port瀵诲潃
+			nsqList[conf.Addr] = n
+		}
+	}
+}
+
+func DestroyProducerPool() {
+	for _, p := range nsqList {
+		p.Close()
+	}
+}
+
+func GetProducer(key ...string) (*producer, error) {
+	k := "default"
+	if len(key) > 0 {
+		k = key[0]
+	}
+	if n, ok := nsqList[k]; ok {
+		return &producer{n}, nil
+	}
+	return nil, fmt.Errorf("GetProducer can't get producer")
+}
+
+// CreateNSQProducer create nsq producer
+func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) {
+	cfg := nsq.NewConfig()
+	for _, option := range options {
+		option(cfg)
+	}
+
+	producer, err := nsq.NewProducer(addr, cfg)
+	if err != nil {
+		return nil, err
+	}
+	// producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError)
+	return producer, nil
+}
+
+// CreateNSQProducerPool create a nwq producer pool
+func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) {
+	factory := func() (*nsq.Producer, error) {
+		// TODO 杩欓噷搴旇鎵цping鏂规硶鏉ョ‘瀹氳繛鎺ユ槸姝e父鐨勫惁鍒欎笉搴旇鍒涘缓conn
+		return newProducer(addr, options...)
+	}
+	nsqPool, err := NewChannelPool(initSize, maxSize, factory)
+	if err != nil {
+		return nil, err
+	}
+	return nsqPool, nil
+}
+
+func NewProducer(addr string) (*producer, error) {
+	CreateProducerPool(map[string]Config{"default": {addr, 1, 1}})
+	return GetProducer()
+}
+
+func retry(num int, fn func() error) error {
+	var err error
+	for i := 0; i < num; i++ {
+		err = fn()
+		if err == nil {
+			break
+		}
+	}
+	return err
+}
+
+func (p *producer) Publish(topic string, body []byte) error {
+	nsq, err := p.pool.Get()
+	if err != nil {
+		return err
+	}
+	defer nsq.Close()
+
+	return retry(2, func() error {
+		return nsq.Publish(topic, body)
+	})
+}
+
+func (p *producer) MultiPublish(topic string, body [][]byte) error {
+	nsq, err := p.pool.Get()
+	if err != nil {
+		return err
+	}
+	defer nsq.Close()
+
+	return retry(2, func() error {
+		return nsq.MultiPublish(topic, body)
+	})
+}
+
+func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error {
+	nsq, err := p.pool.Get()
+	if err != nil {
+		return err
+	}
+	defer nsq.Close()
+
+	return retry(2, func() error {
+		return nsq.DeferredPublish(topic, delay, body)
+	})
+}

--
Gitblit v1.8.0