zhangmeng
2023-05-16 ecb63331999a88c4980997aa8473ffed0dd31a3c
nsqclient producer consumer
16个文件已添加
1个文件已修改
1220 ■■■■■ 已修改文件
.gitignore 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
TST/ctest/ctest.cpp 95 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
TST/test/test.go 76 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
clib/libnsqclient.h 101 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 144 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
make.sh 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/LICENSE 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/README.md 54 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/channel.go 134 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/channel_test.go 218 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/conn.go 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/consumer.go 90 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/pointer.go 57 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/pool.go 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/producer.go 139 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -21,3 +21,6 @@
*.exe
*.test
nsqCli
cnsqcli
TST/ctest/ctest.cpp
New file
@@ -0,0 +1,95 @@
#include <stdio.h>
#include <string.h>
#include "clib/libnsqclient.h"
#include <string>
#include <thread>
#include <mutex>
using namespace std;
static void produce(int two){
    char ip[] = "192.168.20.108:4150";
    GoString addr = {ip, (ptrdiff_t)strlen(ip)};
    void* p = createProducer(addr);
    string msg("cnsqclient dynamic library");
    while(msg.size() < 32){
        msg += msg;
    }
    // printf("msg %s\n", msg.c_str());
    for(int i = 0; i < 1000000; i++){
        GoString topic = {"test", 4};
        string amsg = msg + "-x";
        GoSlice data{(void*)amsg.data(), (GoInt)amsg.size(), (GoInt)amsg.size()};
        if (!publish(p, topic, data)){
            printf("publish msg failed topic %s\n", topic.p);
            exit(0);
        }
        if (two){
            topic.p = "test2";
            topic.n = 5;
            amsg = msg + "-y";
            data.data = (void*)amsg.data();
            if (!publish(p, topic, data)){
                printf("publish msg failed topic %s\n", topic.p);
                exit(0);
            }
        }
    }
    destroyProducer(p);
}
static void consume(const char* topic, const char* channel){
    GoString t = {topic, (ptrdiff_t)strlen(topic)};
    GoString c = {channel, (ptrdiff_t)strlen(channel)};
    void* con = createConsumer(t, c);
    char ip[] = "192.168.20.108:4150";
    GoString addr = {ip, (ptrdiff_t)strlen(ip)};
    // thread
    thread([&con,&addr]{
        Run(con, addr);
    }).detach();
    auto start = chrono::steady_clock::now();
    int count = 0;
    while (true) {
        void* msg = NULL;
        size_t size = 0;
        GoUint8 ok = getMessage(con, &msg, &size);
        if (!ok){
            this_thread::sleep_for(chrono::milliseconds(100));
            continue;
        }
        count++;
        printf("======>> recv msg %s size %d\n", (char*)msg, count);
        relMessage(msg);
        if (count > 999000){
            printf("======>> use time %d\n",
                chrono::duration_cast<chrono::seconds>(chrono::steady_clock::now()-start).count());
        }
    }
    printf("======>> recv all msg size %d\n", count);
}
int main(int argc, char const *argv[])
{
    thread([]{
        produce(false);
    }).detach();
    // thread([]{
    //     consume("test2", "sensor01");
    // }).detach();
    consume("test", "sensor01");
    return 0;
}
TST/test/test.go
New file
@@ -0,0 +1,76 @@
package test
import (
    "context"
    "fmt"
    "log"
    "nsqCli/nsqclient"
    "time"
    "github.com/nsqio/go-nsq"
)
func produce(two bool) {
    p, _ := nsqclient.NewProducer("192.168.20.108:4150")
    var str string
    for len(str) < 32 {
        str += "cnsqclient dynamic library"
    }
    msgx := []byte(str + "--x")
    msgy := []byte(str + "--y")
    // count := 0
    for i := 0; i < 1000000; i++ {
        // if e := p.Publish("test", []byte("x")); e != nil {
        if e := p.Publish("test", msgx); e != nil {
            log.Fatal("Publish error:" + e.Error())
        }
        if two {
            // if e := p.Publish("test", []byte("y")); e != nil {
            if e := p.Publish("test2", msgy); e != nil {
                log.Fatal("Publish error:" + e.Error())
            }
        }
        // log.Println("send time ", count)
        // count++
    }
}
func consume(topic, channel string) {
    ctx, cancel := context.WithCancel(context.Background())
    if c, e := nsqclient.NewNsqConsumer(ctx, topic, channel); e != nil {
        fmt.Println("NewNsqConsumer failed", e)
        return
    } else {
        ch := make(chan struct{})
        count := 0
        c.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
            count++
            fmt.Println("recv msg ", string(msg.Body), " size", count)
            if count > 999000 {
                ch <- struct{}{}
            }
            return nil
        }))
        // go c.Run("192.168.20.108:4150", 2)
        go c.RunLookupd("192.168.20.108:4161", 2)
        t := time.Now()
        <-ch
        // fmt.Println("======>> use time ", time.Since(t))
        fmt.Println("======>> use time ", time.Now().Unix()-t.Unix())
        cancel()
    }
}
func Test() {
    go produce(false)
    // go consume("test2", "sensor01")
    consume("test", "sensor01")
}
clib/libnsqclient.h
New file
@@ -0,0 +1,101 @@
/* Code generated by cmd/cgo; DO NOT EDIT. */
/* package nsqCli */
#line 1 "cgo-builtin-export-prolog"
#include <stddef.h> /* for ptrdiff_t below */
#ifndef GO_CGO_EXPORT_PROLOGUE_H
#define GO_CGO_EXPORT_PROLOGUE_H
#ifndef GO_CGO_GOSTRING_TYPEDEF
typedef struct { const char *p; ptrdiff_t n; } _GoString_;
#endif
#endif
/* Start of preamble from import "C" comments.  */
#line 3 "main.go"
 #include <stdlib.h>
 #include <string.h>
#line 1 "cgo-generated-wrapper"
/* End of preamble from import "C" comments.  */
/* Start of boilerplate cgo prologue.  */
#line 1 "cgo-gcc-export-header-prolog"
#ifndef GO_CGO_PROLOGUE_H
#define GO_CGO_PROLOGUE_H
typedef signed char GoInt8;
typedef unsigned char GoUint8;
typedef short GoInt16;
typedef unsigned short GoUint16;
typedef int GoInt32;
typedef unsigned int GoUint32;
typedef long long GoInt64;
typedef unsigned long long GoUint64;
typedef GoInt64 GoInt;
typedef GoUint64 GoUint;
typedef __SIZE_TYPE__ GoUintptr;
typedef float GoFloat32;
typedef double GoFloat64;
typedef float _Complex GoComplex64;
typedef double _Complex GoComplex128;
/*
  static assertion to make sure the file is being used on architecture
  at least with matching size of GoInt.
*/
typedef char _check_for_64_bit_pointer_matching_GoInt[sizeof(void*)==64/8 ? 1:-1];
#ifndef GO_CGO_GOSTRING_TYPEDEF
typedef _GoString_ GoString;
#endif
typedef void *GoMap;
typedef void *GoChan;
typedef struct { void *t; void *v; } GoInterface;
typedef struct { void *data; GoInt len; GoInt cap; } GoSlice;
#endif
/* End of boilerplate cgo prologue.  */
#ifdef __cplusplus
extern "C" {
#endif
extern void* createProducer(GoString p0);
extern void destroyProducer(void* p0);
extern GoUint8 publish(void* p0, GoString p1, GoSlice p2);
extern GoUint8 multiPublish(void* p0, GoString p1, GoSlice p2);
extern GoUint8 deferredPublish(void* p0, GoString p1, GoInt p2, GoSlice p3);
extern void* createConsumer(GoString p0, GoString p1);
extern void destroyConsumer(void* p0);
extern void Run(void* p0, GoString p1);
extern void RunLookupd(void* p0, GoString p1);
extern GoUint8 getMessage(void* p0, void** p1, size_t* p2);
extern void relMessage(void* p0);
#ifdef __cplusplus
}
#endif
go.mod
New file
@@ -0,0 +1,5 @@
module nsqCli
go 1.14
require github.com/nsqio/go-nsq v1.1.0
go.sum
New file
@@ -0,0 +1,4 @@
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
main.go
New file
@@ -0,0 +1,144 @@
package main
// #include <stdlib.h>
// #include <string.h>
import "C"
import (
    "nsqCli/TST/test"
    "nsqCli/nsqclient"
    "sync"
    "time"
    "unsafe"
    "github.com/nsqio/go-nsq"
)
//export createProducer
func createProducer(addr string) unsafe.Pointer {
    n, _ := nsqclient.NewProducer(addr)
    return nsqclient.Save(n)
}
//export destroyProducer
func destroyProducer(ph unsafe.Pointer) {
    nsqclient.Unref(ph)
    nsqclient.DestroyProducerPool()
}
func pcvt(ph unsafe.Pointer) nsqclient.Producer {
    return nsqclient.Restore(ph).(nsqclient.Producer)
}
//export publish
func publish(ph unsafe.Pointer, topic string, msg []byte) bool {
    p := pcvt(ph)
    if err := p.Publish(topic, msg); err != nil {
        return false
    }
    return true
}
//export multiPublish
func multiPublish(ph unsafe.Pointer, topic string, msg [][]byte) bool {
    p := pcvt(ph)
    if err := p.MultiPublish(topic, msg); err != nil {
        return false
    }
    return true
}
//export deferredPublish
func deferredPublish(ph unsafe.Pointer, topic string, ms int, msg []byte) bool {
    p := pcvt(ph)
    if err := p.DeferredPublish(topic, time.Duration(ms)*time.Millisecond, msg); err != nil {
        return false
    }
    return true
}
/////////////////////////////////////////////////////////////
type consumer struct {
    nsqcon *nsqclient.NsqConsumer
    lck    sync.Mutex
    msgs   []*nsq.Message
}
//export createConsumer
func createConsumer(topic, channel string) unsafe.Pointer {
    if c, err := nsqclient.NewNsqConsumer(nil, topic, channel); err == nil {
        con := &consumer{
            nsqcon: c,
        }
        return nsqclient.Save(con)
    }
    return nil
}
func ccvt(ch unsafe.Pointer) *consumer {
    return nsqclient.Restore(ch).(*consumer)
}
//export destroyConsumer
func destroyConsumer(ch unsafe.Pointer) {
    nsqclient.DestroyNsqConsumer(ccvt(ch).nsqcon)
    nsqclient.Unref(ch)
}
//export Run
func Run(ch unsafe.Pointer, addr string) {
    c := ccvt(ch)
    c.nsqcon.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
        c.lck.Lock()
        defer c.lck.Unlock()
        c.msgs = append(c.msgs, msg)
        return nil
    }))
    c.nsqcon.Run(addr, 1)
}
//export RunLookupd
func RunLookupd(ch unsafe.Pointer, lookAddr string) {
    c := ccvt(ch)
    c.nsqcon.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
        c.lck.Lock()
        defer c.lck.Unlock()
        c.msgs = append(c.msgs, msg)
        return nil
    }))
    c.nsqcon.RunLookupd(lookAddr, 1)
}
//export getMessage
func getMessage(ch unsafe.Pointer, data *unsafe.Pointer, size *C.size_t) bool {
    c := ccvt(ch)
    c.lck.Lock()
    defer c.lck.Unlock()
    if len(c.msgs) == 0 {
        return false
    }
    msg := c.msgs[0]
    c.msgs = c.msgs[1:]
    *size = C.size_t(len(msg.Body))
    ptr := C.malloc(*size)
    C.memcpy(ptr, unsafe.Pointer(&msg.Body[0]), *size)
    *data = ptr
    return true
}
//export relMessage
func relMessage(msg unsafe.Pointer) {
    if msg != nil {
        C.free(msg)
    }
}
func main() {
    test.Test()
}
make.sh
New file
@@ -0,0 +1,10 @@
#!/bin/bash
if [ ! -d "clib" ]; then
    mkdir clib
fi
go tool cgo -exportheader clib/libnsqclient.h main.go &&
go build -buildmode=c-shared -o ./clib/libnsqclient.so &&
rm -fr _obj &&
go build &&
g++ -std=c++11 -g -O0 -o cnsqcli TST/ctest/ctest.cpp -I. -Lclib/ -lnsqclient -ldl -pthread
nsqclient/LICENSE
New file
@@ -0,0 +1,20 @@
The MIT License (MIT)
Copyright (c) 2013 Fatih Arslan
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
nsqclient/README.md
New file
@@ -0,0 +1,54 @@
# NSQPool
NSQPool is a thread safe connection pool for nsq producer. It can be used to
manage and reuse nsq producer connection.
## Install and Usage
Install the package with:
```bash
github.com/qgymje/nsqpool
```
Import it with:
```go
import (
    "github.com/qgymje/nsqpool"
    nsq "github.com/nsqio/go-nsq"
)
```
and use `pool` as the package name inside the code.
## Example
```go
// create a factory() to be used with channel based pool
factory := func() (*nsq.Producer, error) {
    config := nsq.NewConfig()
    return nsq.NewProducer(":4150", config)
}
nsqPool, err := pool.NewChannelPool(5, 30, factory)
producer, err := nsqPool.Get()
producer.Publish("topic", "some data")
// do something with producer and put it back to the pool by closing the connection
// (this doesn't close the underlying connection instead it's putting it back
// to the pool).
producer.Close()
// close pool any time you want, this closes all the connections inside a pool
nsqPool.Close()
// currently available connections in the pool
current := nsqPool.Len()
```
## License
The MIT License (MIT) - see LICENSE for more details
nsqclient/channel.go
New file
@@ -0,0 +1,134 @@
package nsqclient
import (
    "errors"
    "fmt"
    "sync"
    nsq "github.com/nsqio/go-nsq"
)
// channelPool implements the Pool interface based on buffered channels.
type channelPool struct {
    // storage for our net.Conn connections
    mu    sync.Mutex
    conns chan *nsq.Producer
    // net.Conn generator
    factory Factory
}
// Factory is a function to create new connections.
type Factory func() (*nsq.Producer, error)
// NewChannelPool returns a new pool based on buffered channels with an initial
// capacity and maximum capacity. Factory is used when initial capacity is
// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool
// until a new Get() is called. During a Get(), If there is no new connection
// available in the pool, a new connection will be created via the Factory()
// method.
func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) {
    if initialCap < 0 || maxCap <= 0 || initialCap > maxCap {
        return nil, errors.New("invalid capacity settings")
    }
    c := &channelPool{
        conns:   make(chan *nsq.Producer, maxCap),
        factory: factory,
    }
    // create initial connections, if something goes wrong,
    // just close the pool error out.
    for i := 0; i < initialCap; i++ {
        conn, err := factory()
        if err != nil {
            c.Close()
            return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
        }
        c.conns <- conn
    }
    return c, nil
}
func (c *channelPool) getConns() chan *nsq.Producer {
    c.mu.Lock()
    conns := c.conns
    c.mu.Unlock()
    return conns
}
// Get implements the Pool interfaces Get() method. If there is no new
// connection available in the pool, a new connection will be created via the
// Factory() method.
func (c *channelPool) Get() (*PoolConn, error) {
    conns := c.getConns()
    if conns == nil {
        return nil, ErrClosed
    }
    // wrap our connections with out custom net.Conn implementation (wrapConn
    // method) that puts the connection back to the pool if it's closed.
    select {
    case conn := <-conns:
        if conn == nil {
            return nil, ErrClosed
        }
        return c.wrapConn(conn), nil
    default:
        conn, err := c.factory()
        if err != nil {
            return nil, err
        }
        return c.wrapConn(conn), nil
    }
}
// put puts the connection back to the pool. If the pool is full or closed,
// conn is simply closed. A nil conn will be rejected.
func (c *channelPool) put(conn *nsq.Producer) error {
    if conn == nil {
        return errors.New("connection is nil. rejecting")
    }
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.conns == nil {
        // pool is closed, close passed connection
        conn.Stop()
        return nil
    }
    // put the resource back into the pool. If the pool is full, this will
    // block and the default case will be executed.
    select {
    case c.conns <- conn:
        return nil
    default:
        // pool is full, close passed connection
        conn.Stop()
        return nil
    }
}
func (c *channelPool) Close() {
    c.mu.Lock()
    conns := c.conns
    c.conns = nil
    c.factory = nil
    c.mu.Unlock()
    if conns == nil {
        return
    }
    close(conns)
    for conn := range conns {
        conn.Stop()
    }
}
func (c *channelPool) Len() int { return len(c.getConns()) }
nsqclient/channel_test.go
New file
@@ -0,0 +1,218 @@
package nsqclient
import (
    "math/rand"
    "sync"
    "testing"
    "time"
    "github.com/nsqio/go-nsq"
)
var (
    InitialCap = 2
    MaximumCap = 10
    factory    = func() (*nsq.Producer, error) {
        config := nsq.NewConfig()
        return nsq.NewProducer(":4160", config)
    }
)
func newChannelPool() (Pool, error) {
    return NewChannelPool(InitialCap, MaximumCap, factory)
}
func TestNew(t *testing.T) {
    _, err := newChannelPool()
    if err != nil {
        t.Errorf("New error: %s", err)
    }
}
func TestPool_Get(t *testing.T) {
    p, _ := newChannelPool()
    defer p.Close()
    _, err := p.Get()
    if err != nil {
        t.Errorf("Get error: %s", err)
    }
    // after one get, current capacity should be lowered by one.
    if p.Len() != (InitialCap - 1) {
        t.Errorf("Get error. Expecting %d, got %d",
            (InitialCap - 1), p.Len())
    }
    // get them all
    var wg sync.WaitGroup
    for i := 0; i < (InitialCap - 1); i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            _, err := p.Get()
            if err != nil {
                t.Errorf("Get error: %s", err)
            }
        }()
    }
    wg.Wait()
    if p.Len() != 0 {
        t.Errorf("Get error. Expecting %d, got %d",
            (InitialCap - 1), p.Len())
    }
    _, err = p.Get()
    if err != nil {
        t.Errorf("Get error: %s", err)
    }
}
func TestPool_Put(t *testing.T) {
    p, err := NewChannelPool(0, 30, factory)
    if err != nil {
        t.Fatal(err)
    }
    defer p.Close()
    // get/create from the pool
    conns := make([]*PoolConn, MaximumCap)
    for i := 0; i < MaximumCap; i++ {
        conn, _ := p.Get()
        conns[i] = conn
    }
    // now put them all back
    for _, conn := range conns {
        conn.Close()
    }
    if p.Len() != MaximumCap {
        t.Errorf("Put error len. Expecting %d, got %d",
            1, p.Len())
    }
    conn, _ := p.Get()
    p.Close() // close pool
    conn.Close() // try to put into a full pool
    if p.Len() != 0 {
        t.Errorf("Put error. Closed pool shouldn't allow to put connections.")
    }
}
func TestPool_PutUnusableConn(t *testing.T) {
    p, _ := newChannelPool()
    defer p.Close()
    // ensure pool is not empty
    conn, _ := p.Get()
    conn.Close()
    poolSize := p.Len()
    conn, _ = p.Get()
    conn.Close()
    if p.Len() != poolSize {
        t.Errorf("Pool size is expected to be equal to initial size")
    }
    conn, _ = p.Get()
    conn.MarkUnusable()
    conn.Close()
    if p.Len() != poolSize-1 {
        t.Errorf("Pool size is expected to be initial_size - 1 [%d:%d]", p.Len(), poolSize-1)
    }
}
func TestPool_UsedCapacity(t *testing.T) {
    p, _ := newChannelPool()
    defer p.Close()
    if p.Len() != InitialCap {
        t.Errorf("InitialCap error. Expecting %d, got %d",
            InitialCap, p.Len())
    }
}
func TestPool_Close(t *testing.T) {
    p, _ := newChannelPool()
    // now close it and test all cases we are expecting.
    p.Close()
    c := p.(*channelPool)
    if c.conns != nil {
        t.Errorf("Close error, conns channel should be nil")
    }
    if c.factory != nil {
        t.Errorf("Close error, factory should be nil")
    }
    _, err := p.Get()
    if err == nil {
        t.Errorf("Close error, get conn should return an error")
    }
    if p.Len() != 0 {
        t.Errorf("Close error used capacity. Expecting 0, got %d", p.Len())
    }
}
func TestPoolConcurrent(t *testing.T) {
    p, _ := newChannelPool()
    pipe := make(chan *PoolConn, 0)
    go func() {
        p.Close()
    }()
    for i := 0; i < MaximumCap; i++ {
        go func() {
            conn, _ := p.Get()
            pipe <- conn
        }()
        go func() {
            conn := <-pipe
            if conn == nil {
                return
            }
            conn.Close()
        }()
    }
}
func TestPoolConcurrent2(t *testing.T) {
    p, _ := NewChannelPool(0, 30, factory)
    var wg sync.WaitGroup
    go func() {
        for i := 0; i < 10; i++ {
            wg.Add(1)
            go func(i int) {
                conn, _ := p.Get()
                time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
                conn.Close()
                wg.Done()
            }(i)
        }
    }()
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            conn, _ := p.Get()
            time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
            conn.Close()
            wg.Done()
        }(i)
    }
    wg.Wait()
}
nsqclient/conn.go
New file
@@ -0,0 +1,45 @@
package nsqclient
import (
    "sync"
    nsq "github.com/nsqio/go-nsq"
)
// PoolConn is a wrapper around net.Conn to modify the the behavior of
// net.Conn's Close() method.
type PoolConn struct {
    *nsq.Producer
    mu       sync.RWMutex
    c        *channelPool
    unusable bool
}
// Close puts the given connects back to the pool instead of closing it.
func (p *PoolConn) Close() error {
    p.mu.RLock()
    defer p.mu.RUnlock()
    if p.unusable {
        if p.Producer != nil {
            p.Producer.Stop()
            return nil
        }
        return nil
    }
    return p.c.put(p.Producer)
}
// MarkUnusable marks the connection not usable any more, to let the pool close it instead of returning it to pool.
func (p *PoolConn) MarkUnusable() {
    p.mu.Lock()
    p.unusable = true
    p.mu.Unlock()
}
// newConn wraps a standard net.Conn to a poolConn net.Conn.
func (c *channelPool) wrapConn(conn *nsq.Producer) *PoolConn {
    p := &PoolConn{c: c}
    p.Producer = conn
    return p
}
nsqclient/consumer.go
New file
@@ -0,0 +1,90 @@
package nsqclient
import (
    "context"
    "fmt"
    "time"
    nsq "github.com/nsqio/go-nsq"
)
type NsqConsumer struct {
    consumer  *nsq.Consumer
    handler   nsq.Handler
    ctx       context.Context
    ctxCancel context.CancelFunc
    topic     string
    channel   string
}
func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
    conf := nsq.NewConfig()
    conf.MaxAttempts = 0
    conf.MsgTimeout = 10 * time.Minute         // 默认一个消息最多能处理十分钟,否则就会重新丢入队列
    conf.LookupdPollInterval = 3 * time.Second // 调整consumer的重连间隔时间为3秒
    for _, option := range options {
        option(conf)
    }
    consumer, err := nsq.NewConsumer(topic, channel, conf)
    if err != nil {
        return nil, err
    }
    return &NsqConsumer{
        consumer: consumer,
        ctx:      ctx,
        topic:    topic,
        channel:  channel,
    }, nil
}
func DestroyNsqConsumer(c *NsqConsumer) {
    if c != nil {
        if c.ctxCancel != nil {
            c.ctxCancel()
        }
    }
}
func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
    n.handler = handler
}
func (n *NsqConsumer) Run(qaddr string, concurrency int) error {
    return n.RunDistributed([]string{qaddr}, nil, concurrency)
}
func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error {
    return n.RunDistributed(nil, []string{lookupAddr}, concurrency)
}
func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error {
    n.consumer.ChangeMaxInFlight(concurrency)
    n.consumer.AddConcurrentHandlers(n.handler, concurrency)
    var err error
    if len(qAddr) > 0 {
        err = n.consumer.ConnectToNSQDs(qAddr)
    } else if len(lAddr) > 0 {
        err = n.consumer.ConnectToNSQLookupds(lAddr)
    } else {
        err = fmt.Errorf("Addr Must NOT Empty")
    }
    if err != nil {
        return err
    }
    if n.ctx == nil {
        n.ctx, n.ctxCancel = context.WithCancel(context.Background())
    }
    for {
        select {
        case <-n.ctx.Done():
            fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel)
            n.consumer.Stop()
            fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel)
            return nil
        }
    }
}
nsqclient/pointer.go
New file
@@ -0,0 +1,57 @@
package nsqclient
// #include <stdlib.h>
import "C"
import (
    "sync"
    "unsafe"
)
var (
    mutex sync.RWMutex
    store = map[unsafe.Pointer]interface{}{}
)
func Save(v interface{}) unsafe.Pointer {
    if v == nil {
        return nil
    }
    // Generate real fake C pointer.
    // This pointer will not store any data, but will bi used for indexing purposes.
    // Since Go doest allow to cast dangling pointer to unsafe.Pointer, we do rally allocate one byte.
    // Why we need indexing, because Go doest allow C code to store pointers to Go data.
    var ptr unsafe.Pointer = C.malloc(C.size_t(1))
    if ptr == nil {
        panic("can't allocate 'cgo-pointer hack index pointer': ptr == nil")
    }
    mutex.Lock()
    store[ptr] = v
    mutex.Unlock()
    return ptr
}
func Restore(ptr unsafe.Pointer) (v interface{}) {
    if ptr == nil {
        return nil
    }
    mutex.RLock()
    v = store[ptr]
    mutex.RUnlock()
    return
}
func Unref(ptr unsafe.Pointer) {
    if ptr == nil {
        return
    }
    mutex.Lock()
    delete(store, ptr)
    mutex.Unlock()
    C.free(ptr)
}
nsqclient/pool.go
New file
@@ -0,0 +1,25 @@
// Package pool implements a pool of net.Conn interfaces to manage and reuse them.
package nsqclient
import "errors"
var (
    // ErrClosed is the error resulting if the pool is closed via pool.Close().
    ErrClosed = errors.New("pool is closed")
)
// Pool interface describes a pool implementation. A pool should have maximum
// capacity. An ideal pool is threadsafe and easy to use.
type Pool interface {
    // Get returns a new connection from the pool. Closing the connections puts
    // it back to the Pool. Closing it when the pool is destroyed or full will
    // be counted as an error.
    Get() (*PoolConn, error)
    // Close closes the pool and all its connections. After Close() the pool is
    // no longer usable.
    Close()
    // Len returns the current number of connections of the pool.
    Len() int
}
nsqclient/producer.go
New file
@@ -0,0 +1,139 @@
package nsqclient
import (
    "fmt"
    "time"
    nsq "github.com/nsqio/go-nsq"
)
type Producer interface {
    Publish(topic string, body []byte) error
    MultiPublish(topic string, body [][]byte) error
    DeferredPublish(topic string, delay time.Duration, body []byte) error
}
var _ Producer = (*producer)(nil)
type producer struct {
    pool Pool
}
var (
    //                    name   pool producer
    nsqList = make(map[string]Pool)
)
type Config struct {
    Addr     string `toml:"addr" json:"addr"`
    InitSize int    `toml:"init_size" json:"init_size"`
    MaxSize  int    `toml:"max_size" json:"max_size"`
}
func CreateProducerPool(configs map[string]Config) {
    for name, conf := range configs {
        n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize)
        if err == nil {
            nsqList[name] = n
            // 支持ip:port寻址
            nsqList[conf.Addr] = n
        }
    }
}
func DestroyProducerPool() {
    for _, p := range nsqList {
        p.Close()
    }
}
func GetProducer(key ...string) (*producer, error) {
    k := "default"
    if len(key) > 0 {
        k = key[0]
    }
    if n, ok := nsqList[k]; ok {
        return &producer{n}, nil
    }
    return nil, fmt.Errorf("GetProducer can't get producer")
}
// CreateNSQProducer create nsq producer
func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) {
    cfg := nsq.NewConfig()
    for _, option := range options {
        option(cfg)
    }
    producer, err := nsq.NewProducer(addr, cfg)
    if err != nil {
        return nil, err
    }
    // producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError)
    return producer, nil
}
// CreateNSQProducerPool create a nwq producer pool
func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) {
    factory := func() (*nsq.Producer, error) {
        // TODO 这里应该执行ping方法来确定连接是正常的否则不应该创建conn
        return newProducer(addr, options...)
    }
    nsqPool, err := NewChannelPool(initSize, maxSize, factory)
    if err != nil {
        return nil, err
    }
    return nsqPool, nil
}
func NewProducer(addr string) (*producer, error) {
    CreateProducerPool(map[string]Config{"default": {addr, 1, 1}})
    return GetProducer()
}
func retry(num int, fn func() error) error {
    var err error
    for i := 0; i < num; i++ {
        err = fn()
        if err == nil {
            break
        }
    }
    return err
}
func (p *producer) Publish(topic string, body []byte) error {
    nsq, err := p.pool.Get()
    if err != nil {
        return err
    }
    defer nsq.Close()
    return retry(2, func() error {
        return nsq.Publish(topic, body)
    })
}
func (p *producer) MultiPublish(topic string, body [][]byte) error {
    nsq, err := p.pool.Get()
    if err != nil {
        return err
    }
    defer nsq.Close()
    return retry(2, func() error {
        return nsq.MultiPublish(topic, body)
    })
}
func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error {
    nsq, err := p.pool.Get()
    if err != nil {
        return err
    }
    defer nsq.Close()
    return retry(2, func() error {
        return nsq.DeferredPublish(topic, delay, body)
    })
}