From ecb63331999a88c4980997aa8473ffed0dd31a3c Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 16 五月 2023 15:42:56 +0800
Subject: [PATCH] nsqclient producer consumer
---
nsqclient/channel_test.go | 218 ++++++++++
.gitignore | 3
go.mod | 5
make.sh | 10
nsqclient/LICENSE | 20
nsqclient/conn.go | 45 ++
TST/ctest/ctest.cpp | 95 ++++
nsqclient/channel.go | 134 ++++++
nsqclient/consumer.go | 90 ++++
go.sum | 4
nsqclient/pointer.go | 57 ++
nsqclient/producer.go | 139 ++++++
nsqclient/README.md | 54 ++
nsqclient/pool.go | 25 +
TST/test/test.go | 76 +++
main.go | 144 ++++++
clib/libnsqclient.h | 101 ++++
17 files changed, 1,220 insertions(+), 0 deletions(-)
diff --git a/.gitignore b/.gitignore
index 8365624..c41f8dc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,3 +21,6 @@
*.exe
*.test
+
+nsqCli
+cnsqcli
diff --git a/TST/ctest/ctest.cpp b/TST/ctest/ctest.cpp
new file mode 100644
index 0000000..69316ee
--- /dev/null
+++ b/TST/ctest/ctest.cpp
@@ -0,0 +1,95 @@
+#include <stdio.h>
+#include <string.h>
+#include "clib/libnsqclient.h"
+
+#include <string>
+#include <thread>
+#include <mutex>
+
+using namespace std;
+
+static void produce(int two){
+ char ip[] = "192.168.20.108:4150";
+ GoString addr = {ip, (ptrdiff_t)strlen(ip)};
+ void* p = createProducer(addr);
+
+ string msg("cnsqclient dynamic library");
+ while(msg.size() < 32){
+ msg += msg;
+ }
+ // printf("msg %s\n", msg.c_str());
+
+ for(int i = 0; i < 1000000; i++){
+ GoString topic = {"test", 4};
+ string amsg = msg + "-x";
+ GoSlice data{(void*)amsg.data(), (GoInt)amsg.size(), (GoInt)amsg.size()};
+ if (!publish(p, topic, data)){
+ printf("publish msg failed topic %s\n", topic.p);
+ exit(0);
+ }
+
+ if (two){
+ topic.p = "test2";
+ topic.n = 5;
+ amsg = msg + "-y";
+ data.data = (void*)amsg.data();
+ if (!publish(p, topic, data)){
+ printf("publish msg failed topic %s\n", topic.p);
+ exit(0);
+ }
+ }
+
+ }
+ destroyProducer(p);
+}
+
+
+static void consume(const char* topic, const char* channel){
+ GoString t = {topic, (ptrdiff_t)strlen(topic)};
+ GoString c = {channel, (ptrdiff_t)strlen(channel)};
+
+ void* con = createConsumer(t, c);
+
+ char ip[] = "192.168.20.108:4150";
+ GoString addr = {ip, (ptrdiff_t)strlen(ip)};
+
+ // thread
+ thread([&con,&addr]{
+ Run(con, addr);
+ }).detach();
+
+ auto start = chrono::steady_clock::now();
+ int count = 0;
+ while (true) {
+ void* msg = NULL;
+ size_t size = 0;
+ GoUint8 ok = getMessage(con, &msg, &size);
+ if (!ok){
+ this_thread::sleep_for(chrono::milliseconds(100));
+ continue;
+ }
+ count++;
+ printf("======>> recv msg %s size %d\n", (char*)msg, count);
+ relMessage(msg);
+ if (count > 999000){
+ printf("======>> use time %d\n",
+ chrono::duration_cast<chrono::seconds>(chrono::steady_clock::now()-start).count());
+ }
+ }
+ printf("======>> recv all msg size %d\n", count);
+}
+
+int main(int argc, char const *argv[])
+{
+ thread([]{
+ produce(false);
+ }).detach();
+
+ // thread([]{
+ // consume("test2", "sensor01");
+ // }).detach();
+
+ consume("test", "sensor01");
+
+ return 0;
+}
\ No newline at end of file
diff --git a/TST/test/test.go b/TST/test/test.go
new file mode 100644
index 0000000..62d1fae
--- /dev/null
+++ b/TST/test/test.go
@@ -0,0 +1,76 @@
+package test
+
+import (
+ "context"
+ "fmt"
+ "log"
+ "nsqCli/nsqclient"
+ "time"
+
+ "github.com/nsqio/go-nsq"
+)
+
+func produce(two bool) {
+ p, _ := nsqclient.NewProducer("192.168.20.108:4150")
+
+ var str string
+ for len(str) < 32 {
+ str += "cnsqclient dynamic library"
+ }
+ msgx := []byte(str + "--x")
+ msgy := []byte(str + "--y")
+
+ // count := 0
+ for i := 0; i < 1000000; i++ {
+ // if e := p.Publish("test", []byte("x")); e != nil {
+ if e := p.Publish("test", msgx); e != nil {
+ log.Fatal("Publish error:" + e.Error())
+ }
+
+ if two {
+ // if e := p.Publish("test", []byte("y")); e != nil {
+ if e := p.Publish("test2", msgy); e != nil {
+ log.Fatal("Publish error:" + e.Error())
+ }
+ }
+
+ // log.Println("send time ", count)
+ // count++
+ }
+}
+
+func consume(topic, channel string) {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ if c, e := nsqclient.NewNsqConsumer(ctx, topic, channel); e != nil {
+ fmt.Println("NewNsqConsumer failed", e)
+ return
+ } else {
+ ch := make(chan struct{})
+
+ count := 0
+ c.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
+ count++
+ fmt.Println("recv msg ", string(msg.Body), " size", count)
+ if count > 999000 {
+ ch <- struct{}{}
+ }
+ return nil
+ }))
+ // go c.Run("192.168.20.108:4150", 2)
+ go c.RunLookupd("192.168.20.108:4161", 2)
+
+ t := time.Now()
+ <-ch
+ // fmt.Println("======>> use time ", time.Since(t))
+ fmt.Println("======>> use time ", time.Now().Unix()-t.Unix())
+ cancel()
+ }
+}
+
+func Test() {
+ go produce(false)
+
+ // go consume("test2", "sensor01")
+ consume("test", "sensor01")
+}
diff --git a/clib/libnsqclient.h b/clib/libnsqclient.h
new file mode 100644
index 0000000..fab0277
--- /dev/null
+++ b/clib/libnsqclient.h
@@ -0,0 +1,101 @@
+/* Code generated by cmd/cgo; DO NOT EDIT. */
+
+/* package nsqCli */
+
+
+#line 1 "cgo-builtin-export-prolog"
+
+#include <stddef.h> /* for ptrdiff_t below */
+
+#ifndef GO_CGO_EXPORT_PROLOGUE_H
+#define GO_CGO_EXPORT_PROLOGUE_H
+
+#ifndef GO_CGO_GOSTRING_TYPEDEF
+typedef struct { const char *p; ptrdiff_t n; } _GoString_;
+#endif
+
+#endif
+
+/* Start of preamble from import "C" comments. */
+
+
+#line 3 "main.go"
+ #include <stdlib.h>
+ #include <string.h>
+
+#line 1 "cgo-generated-wrapper"
+
+
+/* End of preamble from import "C" comments. */
+
+
+/* Start of boilerplate cgo prologue. */
+#line 1 "cgo-gcc-export-header-prolog"
+
+#ifndef GO_CGO_PROLOGUE_H
+#define GO_CGO_PROLOGUE_H
+
+typedef signed char GoInt8;
+typedef unsigned char GoUint8;
+typedef short GoInt16;
+typedef unsigned short GoUint16;
+typedef int GoInt32;
+typedef unsigned int GoUint32;
+typedef long long GoInt64;
+typedef unsigned long long GoUint64;
+typedef GoInt64 GoInt;
+typedef GoUint64 GoUint;
+typedef __SIZE_TYPE__ GoUintptr;
+typedef float GoFloat32;
+typedef double GoFloat64;
+typedef float _Complex GoComplex64;
+typedef double _Complex GoComplex128;
+
+/*
+ static assertion to make sure the file is being used on architecture
+ at least with matching size of GoInt.
+*/
+typedef char _check_for_64_bit_pointer_matching_GoInt[sizeof(void*)==64/8 ? 1:-1];
+
+#ifndef GO_CGO_GOSTRING_TYPEDEF
+typedef _GoString_ GoString;
+#endif
+typedef void *GoMap;
+typedef void *GoChan;
+typedef struct { void *t; void *v; } GoInterface;
+typedef struct { void *data; GoInt len; GoInt cap; } GoSlice;
+
+#endif
+
+/* End of boilerplate cgo prologue. */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+
+extern void* createProducer(GoString p0);
+
+extern void destroyProducer(void* p0);
+
+extern GoUint8 publish(void* p0, GoString p1, GoSlice p2);
+
+extern GoUint8 multiPublish(void* p0, GoString p1, GoSlice p2);
+
+extern GoUint8 deferredPublish(void* p0, GoString p1, GoInt p2, GoSlice p3);
+
+extern void* createConsumer(GoString p0, GoString p1);
+
+extern void destroyConsumer(void* p0);
+
+extern void Run(void* p0, GoString p1);
+
+extern void RunLookupd(void* p0, GoString p1);
+
+extern GoUint8 getMessage(void* p0, void** p1, size_t* p2);
+
+extern void relMessage(void* p0);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..5a3990a
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,5 @@
+module nsqCli
+
+go 1.14
+
+require github.com/nsqio/go-nsq v1.1.0
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..457d1d9
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,4 @@
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
+github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
diff --git a/main.go b/main.go
new file mode 100644
index 0000000..ededf6a
--- /dev/null
+++ b/main.go
@@ -0,0 +1,144 @@
+package main
+
+// #include <stdlib.h>
+// #include <string.h>
+import "C"
+
+import (
+ "nsqCli/TST/test"
+ "nsqCli/nsqclient"
+ "sync"
+ "time"
+ "unsafe"
+
+ "github.com/nsqio/go-nsq"
+)
+
+//export createProducer
+func createProducer(addr string) unsafe.Pointer {
+ n, _ := nsqclient.NewProducer(addr)
+ return nsqclient.Save(n)
+}
+
+//export destroyProducer
+func destroyProducer(ph unsafe.Pointer) {
+ nsqclient.Unref(ph)
+ nsqclient.DestroyProducerPool()
+}
+
+func pcvt(ph unsafe.Pointer) nsqclient.Producer {
+ return nsqclient.Restore(ph).(nsqclient.Producer)
+}
+
+//export publish
+func publish(ph unsafe.Pointer, topic string, msg []byte) bool {
+ p := pcvt(ph)
+ if err := p.Publish(topic, msg); err != nil {
+ return false
+ }
+ return true
+}
+
+//export multiPublish
+func multiPublish(ph unsafe.Pointer, topic string, msg [][]byte) bool {
+ p := pcvt(ph)
+ if err := p.MultiPublish(topic, msg); err != nil {
+ return false
+ }
+ return true
+}
+
+//export deferredPublish
+func deferredPublish(ph unsafe.Pointer, topic string, ms int, msg []byte) bool {
+ p := pcvt(ph)
+ if err := p.DeferredPublish(topic, time.Duration(ms)*time.Millisecond, msg); err != nil {
+ return false
+ }
+ return true
+}
+
+/////////////////////////////////////////////////////////////
+
+type consumer struct {
+ nsqcon *nsqclient.NsqConsumer
+ lck sync.Mutex
+ msgs []*nsq.Message
+}
+
+//export createConsumer
+func createConsumer(topic, channel string) unsafe.Pointer {
+ if c, err := nsqclient.NewNsqConsumer(nil, topic, channel); err == nil {
+ con := &consumer{
+ nsqcon: c,
+ }
+ return nsqclient.Save(con)
+ }
+ return nil
+}
+
+func ccvt(ch unsafe.Pointer) *consumer {
+ return nsqclient.Restore(ch).(*consumer)
+}
+
+//export destroyConsumer
+func destroyConsumer(ch unsafe.Pointer) {
+ nsqclient.DestroyNsqConsumer(ccvt(ch).nsqcon)
+ nsqclient.Unref(ch)
+}
+
+//export Run
+func Run(ch unsafe.Pointer, addr string) {
+ c := ccvt(ch)
+ c.nsqcon.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
+ c.lck.Lock()
+ defer c.lck.Unlock()
+ c.msgs = append(c.msgs, msg)
+ return nil
+ }))
+
+ c.nsqcon.Run(addr, 1)
+}
+
+//export RunLookupd
+func RunLookupd(ch unsafe.Pointer, lookAddr string) {
+ c := ccvt(ch)
+ c.nsqcon.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
+ c.lck.Lock()
+ defer c.lck.Unlock()
+ c.msgs = append(c.msgs, msg)
+ return nil
+ }))
+
+ c.nsqcon.RunLookupd(lookAddr, 1)
+}
+
+//export getMessage
+func getMessage(ch unsafe.Pointer, data *unsafe.Pointer, size *C.size_t) bool {
+ c := ccvt(ch)
+ c.lck.Lock()
+ defer c.lck.Unlock()
+ if len(c.msgs) == 0 {
+ return false
+ }
+
+ msg := c.msgs[0]
+ c.msgs = c.msgs[1:]
+
+ *size = C.size_t(len(msg.Body))
+ ptr := C.malloc(*size)
+ C.memcpy(ptr, unsafe.Pointer(&msg.Body[0]), *size)
+ *data = ptr
+
+ return true
+}
+
+//export relMessage
+func relMessage(msg unsafe.Pointer) {
+ if msg != nil {
+ C.free(msg)
+ }
+}
+
+func main() {
+ test.Test()
+}
diff --git a/make.sh b/make.sh
new file mode 100755
index 0000000..51d276d
--- /dev/null
+++ b/make.sh
@@ -0,0 +1,10 @@
+#!/bin/bash
+
+if [ ! -d "clib" ]; then
+ mkdir clib
+fi
+go tool cgo -exportheader clib/libnsqclient.h main.go &&
+go build -buildmode=c-shared -o ./clib/libnsqclient.so &&
+rm -fr _obj &&
+go build &&
+g++ -std=c++11 -g -O0 -o cnsqcli TST/ctest/ctest.cpp -I. -Lclib/ -lnsqclient -ldl -pthread
diff --git a/nsqclient/LICENSE b/nsqclient/LICENSE
new file mode 100644
index 0000000..25fdaf6
--- /dev/null
+++ b/nsqclient/LICENSE
@@ -0,0 +1,20 @@
+The MIT License (MIT)
+
+Copyright (c) 2013 Fatih Arslan
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/nsqclient/README.md b/nsqclient/README.md
new file mode 100644
index 0000000..432d86d
--- /dev/null
+++ b/nsqclient/README.md
@@ -0,0 +1,54 @@
+# NSQPool
+
+NSQPool is a thread safe connection pool for nsq producer. It can be used to
+manage and reuse nsq producer connection.
+
+
+## Install and Usage
+
+Install the package with:
+
+```bash
+github.com/qgymje/nsqpool
+```
+
+Import it with:
+
+```go
+import (
+ "github.com/qgymje/nsqpool"
+ nsq "github.com/nsqio/go-nsq"
+)
+```
+
+and use `pool` as the package name inside the code.
+
+## Example
+
+```go
+// create a factory() to be used with channel based pool
+factory := func() (*nsq.Producer, error) {
+ config := nsq.NewConfig()
+ return nsq.NewProducer(":4150", config)
+}
+
+nsqPool, err := pool.NewChannelPool(5, 30, factory)
+
+producer, err := nsqPool.Get()
+
+producer.Publish("topic", "some data")
+// do something with producer and put it back to the pool by closing the connection
+// (this doesn't close the underlying connection instead it's putting it back
+// to the pool).
+producer.Close()
+
+// close pool any time you want, this closes all the connections inside a pool
+nsqPool.Close()
+
+// currently available connections in the pool
+current := nsqPool.Len()
+```
+
+## License
+
+The MIT License (MIT) - see LICENSE for more details
diff --git a/nsqclient/channel.go b/nsqclient/channel.go
new file mode 100644
index 0000000..1594dcc
--- /dev/null
+++ b/nsqclient/channel.go
@@ -0,0 +1,134 @@
+package nsqclient
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+
+ nsq "github.com/nsqio/go-nsq"
+)
+
+// channelPool implements the Pool interface based on buffered channels.
+type channelPool struct {
+ // storage for our net.Conn connections
+ mu sync.Mutex
+ conns chan *nsq.Producer
+
+ // net.Conn generator
+ factory Factory
+}
+
+// Factory is a function to create new connections.
+type Factory func() (*nsq.Producer, error)
+
+// NewChannelPool returns a new pool based on buffered channels with an initial
+// capacity and maximum capacity. Factory is used when initial capacity is
+// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool
+// until a new Get() is called. During a Get(), If there is no new connection
+// available in the pool, a new connection will be created via the Factory()
+// method.
+func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) {
+ if initialCap < 0 || maxCap <= 0 || initialCap > maxCap {
+ return nil, errors.New("invalid capacity settings")
+ }
+
+ c := &channelPool{
+ conns: make(chan *nsq.Producer, maxCap),
+ factory: factory,
+ }
+
+ // create initial connections, if something goes wrong,
+ // just close the pool error out.
+ for i := 0; i < initialCap; i++ {
+ conn, err := factory()
+ if err != nil {
+ c.Close()
+ return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
+ }
+ c.conns <- conn
+ }
+
+ return c, nil
+}
+
+func (c *channelPool) getConns() chan *nsq.Producer {
+ c.mu.Lock()
+ conns := c.conns
+ c.mu.Unlock()
+ return conns
+}
+
+// Get implements the Pool interfaces Get() method. If there is no new
+// connection available in the pool, a new connection will be created via the
+// Factory() method.
+func (c *channelPool) Get() (*PoolConn, error) {
+ conns := c.getConns()
+ if conns == nil {
+ return nil, ErrClosed
+ }
+
+ // wrap our connections with out custom net.Conn implementation (wrapConn
+ // method) that puts the connection back to the pool if it's closed.
+ select {
+ case conn := <-conns:
+ if conn == nil {
+ return nil, ErrClosed
+ }
+
+ return c.wrapConn(conn), nil
+ default:
+ conn, err := c.factory()
+ if err != nil {
+ return nil, err
+ }
+
+ return c.wrapConn(conn), nil
+ }
+}
+
+// put puts the connection back to the pool. If the pool is full or closed,
+// conn is simply closed. A nil conn will be rejected.
+func (c *channelPool) put(conn *nsq.Producer) error {
+ if conn == nil {
+ return errors.New("connection is nil. rejecting")
+ }
+
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if c.conns == nil {
+ // pool is closed, close passed connection
+ conn.Stop()
+ return nil
+ }
+
+ // put the resource back into the pool. If the pool is full, this will
+ // block and the default case will be executed.
+ select {
+ case c.conns <- conn:
+ return nil
+ default:
+ // pool is full, close passed connection
+ conn.Stop()
+ return nil
+ }
+}
+
+func (c *channelPool) Close() {
+ c.mu.Lock()
+ conns := c.conns
+ c.conns = nil
+ c.factory = nil
+ c.mu.Unlock()
+
+ if conns == nil {
+ return
+ }
+
+ close(conns)
+ for conn := range conns {
+ conn.Stop()
+ }
+}
+
+func (c *channelPool) Len() int { return len(c.getConns()) }
diff --git a/nsqclient/channel_test.go b/nsqclient/channel_test.go
new file mode 100644
index 0000000..725db9b
--- /dev/null
+++ b/nsqclient/channel_test.go
@@ -0,0 +1,218 @@
+package nsqclient
+
+import (
+ "math/rand"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/nsqio/go-nsq"
+)
+
+var (
+ InitialCap = 2
+ MaximumCap = 10
+ factory = func() (*nsq.Producer, error) {
+ config := nsq.NewConfig()
+ return nsq.NewProducer(":4160", config)
+ }
+)
+
+func newChannelPool() (Pool, error) {
+ return NewChannelPool(InitialCap, MaximumCap, factory)
+}
+
+func TestNew(t *testing.T) {
+ _, err := newChannelPool()
+ if err != nil {
+ t.Errorf("New error: %s", err)
+ }
+}
+
+func TestPool_Get(t *testing.T) {
+ p, _ := newChannelPool()
+ defer p.Close()
+
+ _, err := p.Get()
+ if err != nil {
+ t.Errorf("Get error: %s", err)
+ }
+
+ // after one get, current capacity should be lowered by one.
+ if p.Len() != (InitialCap - 1) {
+ t.Errorf("Get error. Expecting %d, got %d",
+ (InitialCap - 1), p.Len())
+ }
+
+ // get them all
+ var wg sync.WaitGroup
+ for i := 0; i < (InitialCap - 1); i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ _, err := p.Get()
+ if err != nil {
+ t.Errorf("Get error: %s", err)
+ }
+ }()
+ }
+ wg.Wait()
+
+ if p.Len() != 0 {
+ t.Errorf("Get error. Expecting %d, got %d",
+ (InitialCap - 1), p.Len())
+ }
+
+ _, err = p.Get()
+ if err != nil {
+ t.Errorf("Get error: %s", err)
+ }
+}
+
+func TestPool_Put(t *testing.T) {
+ p, err := NewChannelPool(0, 30, factory)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer p.Close()
+
+ // get/create from the pool
+ conns := make([]*PoolConn, MaximumCap)
+ for i := 0; i < MaximumCap; i++ {
+ conn, _ := p.Get()
+ conns[i] = conn
+ }
+
+ // now put them all back
+ for _, conn := range conns {
+ conn.Close()
+ }
+
+ if p.Len() != MaximumCap {
+ t.Errorf("Put error len. Expecting %d, got %d",
+ 1, p.Len())
+ }
+
+ conn, _ := p.Get()
+ p.Close() // close pool
+
+ conn.Close() // try to put into a full pool
+ if p.Len() != 0 {
+ t.Errorf("Put error. Closed pool shouldn't allow to put connections.")
+ }
+}
+
+func TestPool_PutUnusableConn(t *testing.T) {
+ p, _ := newChannelPool()
+ defer p.Close()
+
+ // ensure pool is not empty
+ conn, _ := p.Get()
+ conn.Close()
+
+ poolSize := p.Len()
+ conn, _ = p.Get()
+ conn.Close()
+ if p.Len() != poolSize {
+ t.Errorf("Pool size is expected to be equal to initial size")
+ }
+
+ conn, _ = p.Get()
+ conn.MarkUnusable()
+
+ conn.Close()
+ if p.Len() != poolSize-1 {
+ t.Errorf("Pool size is expected to be initial_size - 1 [%d:%d]", p.Len(), poolSize-1)
+ }
+}
+
+func TestPool_UsedCapacity(t *testing.T) {
+ p, _ := newChannelPool()
+ defer p.Close()
+
+ if p.Len() != InitialCap {
+ t.Errorf("InitialCap error. Expecting %d, got %d",
+ InitialCap, p.Len())
+ }
+}
+
+func TestPool_Close(t *testing.T) {
+ p, _ := newChannelPool()
+
+ // now close it and test all cases we are expecting.
+ p.Close()
+
+ c := p.(*channelPool)
+
+ if c.conns != nil {
+ t.Errorf("Close error, conns channel should be nil")
+ }
+
+ if c.factory != nil {
+ t.Errorf("Close error, factory should be nil")
+ }
+
+ _, err := p.Get()
+ if err == nil {
+ t.Errorf("Close error, get conn should return an error")
+ }
+
+ if p.Len() != 0 {
+ t.Errorf("Close error used capacity. Expecting 0, got %d", p.Len())
+ }
+}
+
+func TestPoolConcurrent(t *testing.T) {
+ p, _ := newChannelPool()
+ pipe := make(chan *PoolConn, 0)
+
+ go func() {
+ p.Close()
+ }()
+
+ for i := 0; i < MaximumCap; i++ {
+ go func() {
+ conn, _ := p.Get()
+
+ pipe <- conn
+ }()
+
+ go func() {
+ conn := <-pipe
+ if conn == nil {
+ return
+ }
+ conn.Close()
+ }()
+ }
+}
+
+func TestPoolConcurrent2(t *testing.T) {
+ p, _ := NewChannelPool(0, 30, factory)
+
+ var wg sync.WaitGroup
+
+ go func() {
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ go func(i int) {
+ conn, _ := p.Get()
+ time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
+ conn.Close()
+ wg.Done()
+ }(i)
+ }
+ }()
+
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ go func(i int) {
+ conn, _ := p.Get()
+ time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
+ conn.Close()
+ wg.Done()
+ }(i)
+ }
+
+ wg.Wait()
+}
diff --git a/nsqclient/conn.go b/nsqclient/conn.go
new file mode 100644
index 0000000..5c680b5
--- /dev/null
+++ b/nsqclient/conn.go
@@ -0,0 +1,45 @@
+package nsqclient
+
+import (
+ "sync"
+
+ nsq "github.com/nsqio/go-nsq"
+)
+
+// PoolConn is a wrapper around net.Conn to modify the the behavior of
+// net.Conn's Close() method.
+type PoolConn struct {
+ *nsq.Producer
+ mu sync.RWMutex
+ c *channelPool
+ unusable bool
+}
+
+// Close puts the given connects back to the pool instead of closing it.
+func (p *PoolConn) Close() error {
+ p.mu.RLock()
+ defer p.mu.RUnlock()
+
+ if p.unusable {
+ if p.Producer != nil {
+ p.Producer.Stop()
+ return nil
+ }
+ return nil
+ }
+ return p.c.put(p.Producer)
+}
+
+// MarkUnusable marks the connection not usable any more, to let the pool close it instead of returning it to pool.
+func (p *PoolConn) MarkUnusable() {
+ p.mu.Lock()
+ p.unusable = true
+ p.mu.Unlock()
+}
+
+// newConn wraps a standard net.Conn to a poolConn net.Conn.
+func (c *channelPool) wrapConn(conn *nsq.Producer) *PoolConn {
+ p := &PoolConn{c: c}
+ p.Producer = conn
+ return p
+}
diff --git a/nsqclient/consumer.go b/nsqclient/consumer.go
new file mode 100644
index 0000000..152989d
--- /dev/null
+++ b/nsqclient/consumer.go
@@ -0,0 +1,90 @@
+package nsqclient
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ nsq "github.com/nsqio/go-nsq"
+)
+
+type NsqConsumer struct {
+ consumer *nsq.Consumer
+ handler nsq.Handler
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ topic string
+ channel string
+}
+
+func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
+ conf := nsq.NewConfig()
+ conf.MaxAttempts = 0
+ conf.MsgTimeout = 10 * time.Minute // 榛樿涓�涓秷鎭渶澶氳兘澶勭悊鍗佸垎閽燂紝鍚﹀垯灏变細閲嶆柊涓㈠叆闃熷垪
+ conf.LookupdPollInterval = 3 * time.Second // 璋冩暣consumer鐨勯噸杩為棿闅旀椂闂翠负3绉�
+ for _, option := range options {
+ option(conf)
+ }
+
+ consumer, err := nsq.NewConsumer(topic, channel, conf)
+ if err != nil {
+ return nil, err
+ }
+ return &NsqConsumer{
+ consumer: consumer,
+ ctx: ctx,
+ topic: topic,
+ channel: channel,
+ }, nil
+}
+
+func DestroyNsqConsumer(c *NsqConsumer) {
+ if c != nil {
+ if c.ctxCancel != nil {
+ c.ctxCancel()
+ }
+ }
+}
+
+func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
+ n.handler = handler
+}
+
+func (n *NsqConsumer) Run(qaddr string, concurrency int) error {
+ return n.RunDistributed([]string{qaddr}, nil, concurrency)
+}
+
+func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error {
+ return n.RunDistributed(nil, []string{lookupAddr}, concurrency)
+}
+
+func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error {
+ n.consumer.ChangeMaxInFlight(concurrency)
+ n.consumer.AddConcurrentHandlers(n.handler, concurrency)
+
+ var err error
+ if len(qAddr) > 0 {
+ err = n.consumer.ConnectToNSQDs(qAddr)
+ } else if len(lAddr) > 0 {
+ err = n.consumer.ConnectToNSQLookupds(lAddr)
+ } else {
+ err = fmt.Errorf("Addr Must NOT Empty")
+ }
+ if err != nil {
+ return err
+ }
+
+ if n.ctx == nil {
+ n.ctx, n.ctxCancel = context.WithCancel(context.Background())
+ }
+
+ for {
+ select {
+ case <-n.ctx.Done():
+ fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel)
+ n.consumer.Stop()
+ fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel)
+ return nil
+ }
+ }
+}
diff --git a/nsqclient/pointer.go b/nsqclient/pointer.go
new file mode 100644
index 0000000..1cba795
--- /dev/null
+++ b/nsqclient/pointer.go
@@ -0,0 +1,57 @@
+package nsqclient
+
+// #include <stdlib.h>
+import "C"
+import (
+ "sync"
+ "unsafe"
+)
+
+var (
+ mutex sync.RWMutex
+ store = map[unsafe.Pointer]interface{}{}
+)
+
+func Save(v interface{}) unsafe.Pointer {
+ if v == nil {
+ return nil
+ }
+
+ // Generate real fake C pointer.
+ // This pointer will not store any data, but will bi used for indexing purposes.
+ // Since Go doest allow to cast dangling pointer to unsafe.Pointer, we do rally allocate one byte.
+ // Why we need indexing, because Go doest allow C code to store pointers to Go data.
+ var ptr unsafe.Pointer = C.malloc(C.size_t(1))
+ if ptr == nil {
+ panic("can't allocate 'cgo-pointer hack index pointer': ptr == nil")
+ }
+
+ mutex.Lock()
+ store[ptr] = v
+ mutex.Unlock()
+
+ return ptr
+}
+
+func Restore(ptr unsafe.Pointer) (v interface{}) {
+ if ptr == nil {
+ return nil
+ }
+
+ mutex.RLock()
+ v = store[ptr]
+ mutex.RUnlock()
+ return
+}
+
+func Unref(ptr unsafe.Pointer) {
+ if ptr == nil {
+ return
+ }
+
+ mutex.Lock()
+ delete(store, ptr)
+ mutex.Unlock()
+
+ C.free(ptr)
+}
diff --git a/nsqclient/pool.go b/nsqclient/pool.go
new file mode 100644
index 0000000..b773490
--- /dev/null
+++ b/nsqclient/pool.go
@@ -0,0 +1,25 @@
+// Package pool implements a pool of net.Conn interfaces to manage and reuse them.
+package nsqclient
+
+import "errors"
+
+var (
+ // ErrClosed is the error resulting if the pool is closed via pool.Close().
+ ErrClosed = errors.New("pool is closed")
+)
+
+// Pool interface describes a pool implementation. A pool should have maximum
+// capacity. An ideal pool is threadsafe and easy to use.
+type Pool interface {
+ // Get returns a new connection from the pool. Closing the connections puts
+ // it back to the Pool. Closing it when the pool is destroyed or full will
+ // be counted as an error.
+ Get() (*PoolConn, error)
+
+ // Close closes the pool and all its connections. After Close() the pool is
+ // no longer usable.
+ Close()
+
+ // Len returns the current number of connections of the pool.
+ Len() int
+}
diff --git a/nsqclient/producer.go b/nsqclient/producer.go
new file mode 100644
index 0000000..717c7a1
--- /dev/null
+++ b/nsqclient/producer.go
@@ -0,0 +1,139 @@
+package nsqclient
+
+import (
+ "fmt"
+ "time"
+
+ nsq "github.com/nsqio/go-nsq"
+)
+
+type Producer interface {
+ Publish(topic string, body []byte) error
+ MultiPublish(topic string, body [][]byte) error
+ DeferredPublish(topic string, delay time.Duration, body []byte) error
+}
+
+var _ Producer = (*producer)(nil)
+
+type producer struct {
+ pool Pool
+}
+
+var (
+ // name pool producer
+ nsqList = make(map[string]Pool)
+)
+
+type Config struct {
+ Addr string `toml:"addr" json:"addr"`
+ InitSize int `toml:"init_size" json:"init_size"`
+ MaxSize int `toml:"max_size" json:"max_size"`
+}
+
+func CreateProducerPool(configs map[string]Config) {
+ for name, conf := range configs {
+ n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize)
+ if err == nil {
+ nsqList[name] = n
+ // 鏀寔ip:port瀵诲潃
+ nsqList[conf.Addr] = n
+ }
+ }
+}
+
+func DestroyProducerPool() {
+ for _, p := range nsqList {
+ p.Close()
+ }
+}
+
+func GetProducer(key ...string) (*producer, error) {
+ k := "default"
+ if len(key) > 0 {
+ k = key[0]
+ }
+ if n, ok := nsqList[k]; ok {
+ return &producer{n}, nil
+ }
+ return nil, fmt.Errorf("GetProducer can't get producer")
+}
+
+// CreateNSQProducer create nsq producer
+func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) {
+ cfg := nsq.NewConfig()
+ for _, option := range options {
+ option(cfg)
+ }
+
+ producer, err := nsq.NewProducer(addr, cfg)
+ if err != nil {
+ return nil, err
+ }
+ // producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError)
+ return producer, nil
+}
+
+// CreateNSQProducerPool create a nwq producer pool
+func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) {
+ factory := func() (*nsq.Producer, error) {
+ // TODO 杩欓噷搴旇鎵цping鏂规硶鏉ョ‘瀹氳繛鎺ユ槸姝e父鐨勫惁鍒欎笉搴旇鍒涘缓conn
+ return newProducer(addr, options...)
+ }
+ nsqPool, err := NewChannelPool(initSize, maxSize, factory)
+ if err != nil {
+ return nil, err
+ }
+ return nsqPool, nil
+}
+
+func NewProducer(addr string) (*producer, error) {
+ CreateProducerPool(map[string]Config{"default": {addr, 1, 1}})
+ return GetProducer()
+}
+
+func retry(num int, fn func() error) error {
+ var err error
+ for i := 0; i < num; i++ {
+ err = fn()
+ if err == nil {
+ break
+ }
+ }
+ return err
+}
+
+func (p *producer) Publish(topic string, body []byte) error {
+ nsq, err := p.pool.Get()
+ if err != nil {
+ return err
+ }
+ defer nsq.Close()
+
+ return retry(2, func() error {
+ return nsq.Publish(topic, body)
+ })
+}
+
+func (p *producer) MultiPublish(topic string, body [][]byte) error {
+ nsq, err := p.pool.Get()
+ if err != nil {
+ return err
+ }
+ defer nsq.Close()
+
+ return retry(2, func() error {
+ return nsq.MultiPublish(topic, body)
+ })
+}
+
+func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error {
+ nsq, err := p.pool.Get()
+ if err != nil {
+ return err
+ }
+ defer nsq.Close()
+
+ return retry(2, func() error {
+ return nsq.DeferredPublish(topic, delay, body)
+ })
+}
--
Gitblit v1.8.0