nsqclient producer consumer
| | |
| | | |
| | | *.exe |
| | | *.test |
| | | |
| | | nsqCli |
| | | cnsqcli |
New file |
| | |
| | | #include <stdio.h> |
| | | #include <string.h> |
| | | #include "clib/libnsqclient.h" |
| | | |
| | | #include <string> |
| | | #include <thread> |
| | | #include <mutex> |
| | | |
| | | using namespace std; |
| | | |
| | | static void produce(int two){ |
| | | char ip[] = "192.168.20.108:4150"; |
| | | GoString addr = {ip, (ptrdiff_t)strlen(ip)}; |
| | | void* p = createProducer(addr); |
| | | |
| | | string msg("cnsqclient dynamic library"); |
| | | while(msg.size() < 32){ |
| | | msg += msg; |
| | | } |
| | | // printf("msg %s\n", msg.c_str()); |
| | | |
| | | for(int i = 0; i < 1000000; i++){ |
| | | GoString topic = {"test", 4}; |
| | | string amsg = msg + "-x"; |
| | | GoSlice data{(void*)amsg.data(), (GoInt)amsg.size(), (GoInt)amsg.size()}; |
| | | if (!publish(p, topic, data)){ |
| | | printf("publish msg failed topic %s\n", topic.p); |
| | | exit(0); |
| | | } |
| | | |
| | | if (two){ |
| | | topic.p = "test2"; |
| | | topic.n = 5; |
| | | amsg = msg + "-y"; |
| | | data.data = (void*)amsg.data(); |
| | | if (!publish(p, topic, data)){ |
| | | printf("publish msg failed topic %s\n", topic.p); |
| | | exit(0); |
| | | } |
| | | } |
| | | |
| | | } |
| | | destroyProducer(p); |
| | | } |
| | | |
| | | |
| | | static void consume(const char* topic, const char* channel){ |
| | | GoString t = {topic, (ptrdiff_t)strlen(topic)}; |
| | | GoString c = {channel, (ptrdiff_t)strlen(channel)}; |
| | | |
| | | void* con = createConsumer(t, c); |
| | | |
| | | char ip[] = "192.168.20.108:4150"; |
| | | GoString addr = {ip, (ptrdiff_t)strlen(ip)}; |
| | | |
| | | // thread |
| | | thread([&con,&addr]{ |
| | | Run(con, addr); |
| | | }).detach(); |
| | | |
| | | auto start = chrono::steady_clock::now(); |
| | | int count = 0; |
| | | while (true) { |
| | | void* msg = NULL; |
| | | size_t size = 0; |
| | | GoUint8 ok = getMessage(con, &msg, &size); |
| | | if (!ok){ |
| | | this_thread::sleep_for(chrono::milliseconds(100)); |
| | | continue; |
| | | } |
| | | count++; |
| | | printf("======>> recv msg %s size %d\n", (char*)msg, count); |
| | | relMessage(msg); |
| | | if (count > 999000){ |
| | | printf("======>> use time %d\n", |
| | | chrono::duration_cast<chrono::seconds>(chrono::steady_clock::now()-start).count()); |
| | | } |
| | | } |
| | | printf("======>> recv all msg size %d\n", count); |
| | | } |
| | | |
| | | int main(int argc, char const *argv[]) |
| | | { |
| | | thread([]{ |
| | | produce(false); |
| | | }).detach(); |
| | | |
| | | // thread([]{ |
| | | // consume("test2", "sensor01"); |
| | | // }).detach(); |
| | | |
| | | consume("test", "sensor01"); |
| | | |
| | | return 0; |
| | | } |
New file |
| | |
| | | package test |
| | | |
| | | import ( |
| | | "context" |
| | | "fmt" |
| | | "log" |
| | | "nsqCli/nsqclient" |
| | | "time" |
| | | |
| | | "github.com/nsqio/go-nsq" |
| | | ) |
| | | |
| | | func produce(two bool) { |
| | | p, _ := nsqclient.NewProducer("192.168.20.108:4150") |
| | | |
| | | var str string |
| | | for len(str) < 32 { |
| | | str += "cnsqclient dynamic library" |
| | | } |
| | | msgx := []byte(str + "--x") |
| | | msgy := []byte(str + "--y") |
| | | |
| | | // count := 0 |
| | | for i := 0; i < 1000000; i++ { |
| | | // if e := p.Publish("test", []byte("x")); e != nil { |
| | | if e := p.Publish("test", msgx); e != nil { |
| | | log.Fatal("Publish error:" + e.Error()) |
| | | } |
| | | |
| | | if two { |
| | | // if e := p.Publish("test", []byte("y")); e != nil { |
| | | if e := p.Publish("test2", msgy); e != nil { |
| | | log.Fatal("Publish error:" + e.Error()) |
| | | } |
| | | } |
| | | |
| | | // log.Println("send time ", count) |
| | | // count++ |
| | | } |
| | | } |
| | | |
| | | func consume(topic, channel string) { |
| | | ctx, cancel := context.WithCancel(context.Background()) |
| | | |
| | | if c, e := nsqclient.NewNsqConsumer(ctx, topic, channel); e != nil { |
| | | fmt.Println("NewNsqConsumer failed", e) |
| | | return |
| | | } else { |
| | | ch := make(chan struct{}) |
| | | |
| | | count := 0 |
| | | c.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { |
| | | count++ |
| | | fmt.Println("recv msg ", string(msg.Body), " size", count) |
| | | if count > 999000 { |
| | | ch <- struct{}{} |
| | | } |
| | | return nil |
| | | })) |
| | | // go c.Run("192.168.20.108:4150", 2) |
| | | go c.RunLookupd("192.168.20.108:4161", 2) |
| | | |
| | | t := time.Now() |
| | | <-ch |
| | | // fmt.Println("======>> use time ", time.Since(t)) |
| | | fmt.Println("======>> use time ", time.Now().Unix()-t.Unix()) |
| | | cancel() |
| | | } |
| | | } |
| | | |
| | | func Test() { |
| | | go produce(false) |
| | | |
| | | // go consume("test2", "sensor01") |
| | | consume("test", "sensor01") |
| | | } |
New file |
| | |
| | | /* Code generated by cmd/cgo; DO NOT EDIT. */ |
| | | |
| | | /* package nsqCli */ |
| | | |
| | | |
| | | #line 1 "cgo-builtin-export-prolog" |
| | | |
| | | #include <stddef.h> /* for ptrdiff_t below */ |
| | | |
| | | #ifndef GO_CGO_EXPORT_PROLOGUE_H |
| | | #define GO_CGO_EXPORT_PROLOGUE_H |
| | | |
| | | #ifndef GO_CGO_GOSTRING_TYPEDEF |
| | | typedef struct { const char *p; ptrdiff_t n; } _GoString_; |
| | | #endif |
| | | |
| | | #endif |
| | | |
| | | /* Start of preamble from import "C" comments. */ |
| | | |
| | | |
| | | #line 3 "main.go" |
| | | #include <stdlib.h> |
| | | #include <string.h> |
| | | |
| | | #line 1 "cgo-generated-wrapper" |
| | | |
| | | |
| | | /* End of preamble from import "C" comments. */ |
| | | |
| | | |
| | | /* Start of boilerplate cgo prologue. */ |
| | | #line 1 "cgo-gcc-export-header-prolog" |
| | | |
| | | #ifndef GO_CGO_PROLOGUE_H |
| | | #define GO_CGO_PROLOGUE_H |
| | | |
| | | typedef signed char GoInt8; |
| | | typedef unsigned char GoUint8; |
| | | typedef short GoInt16; |
| | | typedef unsigned short GoUint16; |
| | | typedef int GoInt32; |
| | | typedef unsigned int GoUint32; |
| | | typedef long long GoInt64; |
| | | typedef unsigned long long GoUint64; |
| | | typedef GoInt64 GoInt; |
| | | typedef GoUint64 GoUint; |
| | | typedef __SIZE_TYPE__ GoUintptr; |
| | | typedef float GoFloat32; |
| | | typedef double GoFloat64; |
| | | typedef float _Complex GoComplex64; |
| | | typedef double _Complex GoComplex128; |
| | | |
| | | /* |
| | | static assertion to make sure the file is being used on architecture |
| | | at least with matching size of GoInt. |
| | | */ |
| | | typedef char _check_for_64_bit_pointer_matching_GoInt[sizeof(void*)==64/8 ? 1:-1]; |
| | | |
| | | #ifndef GO_CGO_GOSTRING_TYPEDEF |
| | | typedef _GoString_ GoString; |
| | | #endif |
| | | typedef void *GoMap; |
| | | typedef void *GoChan; |
| | | typedef struct { void *t; void *v; } GoInterface; |
| | | typedef struct { void *data; GoInt len; GoInt cap; } GoSlice; |
| | | |
| | | #endif |
| | | |
| | | /* End of boilerplate cgo prologue. */ |
| | | |
| | | #ifdef __cplusplus |
| | | extern "C" { |
| | | #endif |
| | | |
| | | |
| | | extern void* createProducer(GoString p0); |
| | | |
| | | extern void destroyProducer(void* p0); |
| | | |
| | | extern GoUint8 publish(void* p0, GoString p1, GoSlice p2); |
| | | |
| | | extern GoUint8 multiPublish(void* p0, GoString p1, GoSlice p2); |
| | | |
| | | extern GoUint8 deferredPublish(void* p0, GoString p1, GoInt p2, GoSlice p3); |
| | | |
| | | extern void* createConsumer(GoString p0, GoString p1); |
| | | |
| | | extern void destroyConsumer(void* p0); |
| | | |
| | | extern void Run(void* p0, GoString p1); |
| | | |
| | | extern void RunLookupd(void* p0, GoString p1); |
| | | |
| | | extern GoUint8 getMessage(void* p0, void** p1, size_t* p2); |
| | | |
| | | extern void relMessage(void* p0); |
| | | |
| | | #ifdef __cplusplus |
| | | } |
| | | #endif |
New file |
| | |
| | | module nsqCli |
| | | |
| | | go 1.14 |
| | | |
| | | require github.com/nsqio/go-nsq v1.1.0 |
New file |
| | |
| | | github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= |
| | | github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= |
| | | github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= |
| | | github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= |
New file |
| | |
| | | package main |
| | | |
| | | // #include <stdlib.h> |
| | | // #include <string.h> |
| | | import "C" |
| | | |
| | | import ( |
| | | "nsqCli/TST/test" |
| | | "nsqCli/nsqclient" |
| | | "sync" |
| | | "time" |
| | | "unsafe" |
| | | |
| | | "github.com/nsqio/go-nsq" |
| | | ) |
| | | |
| | | //export createProducer |
| | | func createProducer(addr string) unsafe.Pointer { |
| | | n, _ := nsqclient.NewProducer(addr) |
| | | return nsqclient.Save(n) |
| | | } |
| | | |
| | | //export destroyProducer |
| | | func destroyProducer(ph unsafe.Pointer) { |
| | | nsqclient.Unref(ph) |
| | | nsqclient.DestroyProducerPool() |
| | | } |
| | | |
| | | func pcvt(ph unsafe.Pointer) nsqclient.Producer { |
| | | return nsqclient.Restore(ph).(nsqclient.Producer) |
| | | } |
| | | |
| | | //export publish |
| | | func publish(ph unsafe.Pointer, topic string, msg []byte) bool { |
| | | p := pcvt(ph) |
| | | if err := p.Publish(topic, msg); err != nil { |
| | | return false |
| | | } |
| | | return true |
| | | } |
| | | |
| | | //export multiPublish |
| | | func multiPublish(ph unsafe.Pointer, topic string, msg [][]byte) bool { |
| | | p := pcvt(ph) |
| | | if err := p.MultiPublish(topic, msg); err != nil { |
| | | return false |
| | | } |
| | | return true |
| | | } |
| | | |
| | | //export deferredPublish |
| | | func deferredPublish(ph unsafe.Pointer, topic string, ms int, msg []byte) bool { |
| | | p := pcvt(ph) |
| | | if err := p.DeferredPublish(topic, time.Duration(ms)*time.Millisecond, msg); err != nil { |
| | | return false |
| | | } |
| | | return true |
| | | } |
| | | |
| | | ///////////////////////////////////////////////////////////// |
| | | |
| | | type consumer struct { |
| | | nsqcon *nsqclient.NsqConsumer |
| | | lck sync.Mutex |
| | | msgs []*nsq.Message |
| | | } |
| | | |
| | | //export createConsumer |
| | | func createConsumer(topic, channel string) unsafe.Pointer { |
| | | if c, err := nsqclient.NewNsqConsumer(nil, topic, channel); err == nil { |
| | | con := &consumer{ |
| | | nsqcon: c, |
| | | } |
| | | return nsqclient.Save(con) |
| | | } |
| | | return nil |
| | | } |
| | | |
| | | func ccvt(ch unsafe.Pointer) *consumer { |
| | | return nsqclient.Restore(ch).(*consumer) |
| | | } |
| | | |
| | | //export destroyConsumer |
| | | func destroyConsumer(ch unsafe.Pointer) { |
| | | nsqclient.DestroyNsqConsumer(ccvt(ch).nsqcon) |
| | | nsqclient.Unref(ch) |
| | | } |
| | | |
| | | //export Run |
| | | func Run(ch unsafe.Pointer, addr string) { |
| | | c := ccvt(ch) |
| | | c.nsqcon.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { |
| | | c.lck.Lock() |
| | | defer c.lck.Unlock() |
| | | c.msgs = append(c.msgs, msg) |
| | | return nil |
| | | })) |
| | | |
| | | c.nsqcon.Run(addr, 1) |
| | | } |
| | | |
| | | //export RunLookupd |
| | | func RunLookupd(ch unsafe.Pointer, lookAddr string) { |
| | | c := ccvt(ch) |
| | | c.nsqcon.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { |
| | | c.lck.Lock() |
| | | defer c.lck.Unlock() |
| | | c.msgs = append(c.msgs, msg) |
| | | return nil |
| | | })) |
| | | |
| | | c.nsqcon.RunLookupd(lookAddr, 1) |
| | | } |
| | | |
| | | //export getMessage |
| | | func getMessage(ch unsafe.Pointer, data *unsafe.Pointer, size *C.size_t) bool { |
| | | c := ccvt(ch) |
| | | c.lck.Lock() |
| | | defer c.lck.Unlock() |
| | | if len(c.msgs) == 0 { |
| | | return false |
| | | } |
| | | |
| | | msg := c.msgs[0] |
| | | c.msgs = c.msgs[1:] |
| | | |
| | | *size = C.size_t(len(msg.Body)) |
| | | ptr := C.malloc(*size) |
| | | C.memcpy(ptr, unsafe.Pointer(&msg.Body[0]), *size) |
| | | *data = ptr |
| | | |
| | | return true |
| | | } |
| | | |
| | | //export relMessage |
| | | func relMessage(msg unsafe.Pointer) { |
| | | if msg != nil { |
| | | C.free(msg) |
| | | } |
| | | } |
| | | |
| | | func main() { |
| | | test.Test() |
| | | } |
New file |
| | |
| | | #!/bin/bash |
| | | |
| | | if [ ! -d "clib" ]; then |
| | | mkdir clib |
| | | fi |
| | | go tool cgo -exportheader clib/libnsqclient.h main.go && |
| | | go build -buildmode=c-shared -o ./clib/libnsqclient.so && |
| | | rm -fr _obj && |
| | | go build && |
| | | g++ -std=c++11 -g -O0 -o cnsqcli TST/ctest/ctest.cpp -I. -Lclib/ -lnsqclient -ldl -pthread |
New file |
| | |
| | | The MIT License (MIT) |
| | | |
| | | Copyright (c) 2013 Fatih Arslan |
| | | |
| | | Permission is hereby granted, free of charge, to any person obtaining a copy of |
| | | this software and associated documentation files (the "Software"), to deal in |
| | | the Software without restriction, including without limitation the rights to |
| | | use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of |
| | | the Software, and to permit persons to whom the Software is furnished to do so, |
| | | subject to the following conditions: |
| | | |
| | | The above copyright notice and this permission notice shall be included in all |
| | | copies or substantial portions of the Software. |
| | | |
| | | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| | | IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS |
| | | FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR |
| | | COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER |
| | | IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN |
| | | CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
New file |
| | |
| | | # NSQPool |
| | | |
| | | NSQPool is a thread safe connection pool for nsq producer. It can be used to |
| | | manage and reuse nsq producer connection. |
| | | |
| | | |
| | | ## Install and Usage |
| | | |
| | | Install the package with: |
| | | |
| | | ```bash |
| | | github.com/qgymje/nsqpool |
| | | ``` |
| | | |
| | | Import it with: |
| | | |
| | | ```go |
| | | import ( |
| | | "github.com/qgymje/nsqpool" |
| | | nsq "github.com/nsqio/go-nsq" |
| | | ) |
| | | ``` |
| | | |
| | | and use `pool` as the package name inside the code. |
| | | |
| | | ## Example |
| | | |
| | | ```go |
| | | // create a factory() to be used with channel based pool |
| | | factory := func() (*nsq.Producer, error) { |
| | | config := nsq.NewConfig() |
| | | return nsq.NewProducer(":4150", config) |
| | | } |
| | | |
| | | nsqPool, err := pool.NewChannelPool(5, 30, factory) |
| | | |
| | | producer, err := nsqPool.Get() |
| | | |
| | | producer.Publish("topic", "some data") |
| | | // do something with producer and put it back to the pool by closing the connection |
| | | // (this doesn't close the underlying connection instead it's putting it back |
| | | // to the pool). |
| | | producer.Close() |
| | | |
| | | // close pool any time you want, this closes all the connections inside a pool |
| | | nsqPool.Close() |
| | | |
| | | // currently available connections in the pool |
| | | current := nsqPool.Len() |
| | | ``` |
| | | |
| | | ## License |
| | | |
| | | The MIT License (MIT) - see LICENSE for more details |
New file |
| | |
| | | package nsqclient |
| | | |
| | | import ( |
| | | "errors" |
| | | "fmt" |
| | | "sync" |
| | | |
| | | nsq "github.com/nsqio/go-nsq" |
| | | ) |
| | | |
| | | // channelPool implements the Pool interface based on buffered channels. |
| | | type channelPool struct { |
| | | // storage for our net.Conn connections |
| | | mu sync.Mutex |
| | | conns chan *nsq.Producer |
| | | |
| | | // net.Conn generator |
| | | factory Factory |
| | | } |
| | | |
| | | // Factory is a function to create new connections. |
| | | type Factory func() (*nsq.Producer, error) |
| | | |
| | | // NewChannelPool returns a new pool based on buffered channels with an initial |
| | | // capacity and maximum capacity. Factory is used when initial capacity is |
| | | // greater than zero to fill the pool. A zero initialCap doesn't fill the Pool |
| | | // until a new Get() is called. During a Get(), If there is no new connection |
| | | // available in the pool, a new connection will be created via the Factory() |
| | | // method. |
| | | func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) { |
| | | if initialCap < 0 || maxCap <= 0 || initialCap > maxCap { |
| | | return nil, errors.New("invalid capacity settings") |
| | | } |
| | | |
| | | c := &channelPool{ |
| | | conns: make(chan *nsq.Producer, maxCap), |
| | | factory: factory, |
| | | } |
| | | |
| | | // create initial connections, if something goes wrong, |
| | | // just close the pool error out. |
| | | for i := 0; i < initialCap; i++ { |
| | | conn, err := factory() |
| | | if err != nil { |
| | | c.Close() |
| | | return nil, fmt.Errorf("factory is not able to fill the pool: %s", err) |
| | | } |
| | | c.conns <- conn |
| | | } |
| | | |
| | | return c, nil |
| | | } |
| | | |
| | | func (c *channelPool) getConns() chan *nsq.Producer { |
| | | c.mu.Lock() |
| | | conns := c.conns |
| | | c.mu.Unlock() |
| | | return conns |
| | | } |
| | | |
| | | // Get implements the Pool interfaces Get() method. If there is no new |
| | | // connection available in the pool, a new connection will be created via the |
| | | // Factory() method. |
| | | func (c *channelPool) Get() (*PoolConn, error) { |
| | | conns := c.getConns() |
| | | if conns == nil { |
| | | return nil, ErrClosed |
| | | } |
| | | |
| | | // wrap our connections with out custom net.Conn implementation (wrapConn |
| | | // method) that puts the connection back to the pool if it's closed. |
| | | select { |
| | | case conn := <-conns: |
| | | if conn == nil { |
| | | return nil, ErrClosed |
| | | } |
| | | |
| | | return c.wrapConn(conn), nil |
| | | default: |
| | | conn, err := c.factory() |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | |
| | | return c.wrapConn(conn), nil |
| | | } |
| | | } |
| | | |
| | | // put puts the connection back to the pool. If the pool is full or closed, |
| | | // conn is simply closed. A nil conn will be rejected. |
| | | func (c *channelPool) put(conn *nsq.Producer) error { |
| | | if conn == nil { |
| | | return errors.New("connection is nil. rejecting") |
| | | } |
| | | |
| | | c.mu.Lock() |
| | | defer c.mu.Unlock() |
| | | |
| | | if c.conns == nil { |
| | | // pool is closed, close passed connection |
| | | conn.Stop() |
| | | return nil |
| | | } |
| | | |
| | | // put the resource back into the pool. If the pool is full, this will |
| | | // block and the default case will be executed. |
| | | select { |
| | | case c.conns <- conn: |
| | | return nil |
| | | default: |
| | | // pool is full, close passed connection |
| | | conn.Stop() |
| | | return nil |
| | | } |
| | | } |
| | | |
| | | func (c *channelPool) Close() { |
| | | c.mu.Lock() |
| | | conns := c.conns |
| | | c.conns = nil |
| | | c.factory = nil |
| | | c.mu.Unlock() |
| | | |
| | | if conns == nil { |
| | | return |
| | | } |
| | | |
| | | close(conns) |
| | | for conn := range conns { |
| | | conn.Stop() |
| | | } |
| | | } |
| | | |
| | | func (c *channelPool) Len() int { return len(c.getConns()) } |
New file |
| | |
| | | package nsqclient |
| | | |
| | | import ( |
| | | "math/rand" |
| | | "sync" |
| | | "testing" |
| | | "time" |
| | | |
| | | "github.com/nsqio/go-nsq" |
| | | ) |
| | | |
| | | var ( |
| | | InitialCap = 2 |
| | | MaximumCap = 10 |
| | | factory = func() (*nsq.Producer, error) { |
| | | config := nsq.NewConfig() |
| | | return nsq.NewProducer(":4160", config) |
| | | } |
| | | ) |
| | | |
| | | func newChannelPool() (Pool, error) { |
| | | return NewChannelPool(InitialCap, MaximumCap, factory) |
| | | } |
| | | |
| | | func TestNew(t *testing.T) { |
| | | _, err := newChannelPool() |
| | | if err != nil { |
| | | t.Errorf("New error: %s", err) |
| | | } |
| | | } |
| | | |
| | | func TestPool_Get(t *testing.T) { |
| | | p, _ := newChannelPool() |
| | | defer p.Close() |
| | | |
| | | _, err := p.Get() |
| | | if err != nil { |
| | | t.Errorf("Get error: %s", err) |
| | | } |
| | | |
| | | // after one get, current capacity should be lowered by one. |
| | | if p.Len() != (InitialCap - 1) { |
| | | t.Errorf("Get error. Expecting %d, got %d", |
| | | (InitialCap - 1), p.Len()) |
| | | } |
| | | |
| | | // get them all |
| | | var wg sync.WaitGroup |
| | | for i := 0; i < (InitialCap - 1); i++ { |
| | | wg.Add(1) |
| | | go func() { |
| | | defer wg.Done() |
| | | _, err := p.Get() |
| | | if err != nil { |
| | | t.Errorf("Get error: %s", err) |
| | | } |
| | | }() |
| | | } |
| | | wg.Wait() |
| | | |
| | | if p.Len() != 0 { |
| | | t.Errorf("Get error. Expecting %d, got %d", |
| | | (InitialCap - 1), p.Len()) |
| | | } |
| | | |
| | | _, err = p.Get() |
| | | if err != nil { |
| | | t.Errorf("Get error: %s", err) |
| | | } |
| | | } |
| | | |
| | | func TestPool_Put(t *testing.T) { |
| | | p, err := NewChannelPool(0, 30, factory) |
| | | if err != nil { |
| | | t.Fatal(err) |
| | | } |
| | | defer p.Close() |
| | | |
| | | // get/create from the pool |
| | | conns := make([]*PoolConn, MaximumCap) |
| | | for i := 0; i < MaximumCap; i++ { |
| | | conn, _ := p.Get() |
| | | conns[i] = conn |
| | | } |
| | | |
| | | // now put them all back |
| | | for _, conn := range conns { |
| | | conn.Close() |
| | | } |
| | | |
| | | if p.Len() != MaximumCap { |
| | | t.Errorf("Put error len. Expecting %d, got %d", |
| | | 1, p.Len()) |
| | | } |
| | | |
| | | conn, _ := p.Get() |
| | | p.Close() // close pool |
| | | |
| | | conn.Close() // try to put into a full pool |
| | | if p.Len() != 0 { |
| | | t.Errorf("Put error. Closed pool shouldn't allow to put connections.") |
| | | } |
| | | } |
| | | |
| | | func TestPool_PutUnusableConn(t *testing.T) { |
| | | p, _ := newChannelPool() |
| | | defer p.Close() |
| | | |
| | | // ensure pool is not empty |
| | | conn, _ := p.Get() |
| | | conn.Close() |
| | | |
| | | poolSize := p.Len() |
| | | conn, _ = p.Get() |
| | | conn.Close() |
| | | if p.Len() != poolSize { |
| | | t.Errorf("Pool size is expected to be equal to initial size") |
| | | } |
| | | |
| | | conn, _ = p.Get() |
| | | conn.MarkUnusable() |
| | | |
| | | conn.Close() |
| | | if p.Len() != poolSize-1 { |
| | | t.Errorf("Pool size is expected to be initial_size - 1 [%d:%d]", p.Len(), poolSize-1) |
| | | } |
| | | } |
| | | |
| | | func TestPool_UsedCapacity(t *testing.T) { |
| | | p, _ := newChannelPool() |
| | | defer p.Close() |
| | | |
| | | if p.Len() != InitialCap { |
| | | t.Errorf("InitialCap error. Expecting %d, got %d", |
| | | InitialCap, p.Len()) |
| | | } |
| | | } |
| | | |
| | | func TestPool_Close(t *testing.T) { |
| | | p, _ := newChannelPool() |
| | | |
| | | // now close it and test all cases we are expecting. |
| | | p.Close() |
| | | |
| | | c := p.(*channelPool) |
| | | |
| | | if c.conns != nil { |
| | | t.Errorf("Close error, conns channel should be nil") |
| | | } |
| | | |
| | | if c.factory != nil { |
| | | t.Errorf("Close error, factory should be nil") |
| | | } |
| | | |
| | | _, err := p.Get() |
| | | if err == nil { |
| | | t.Errorf("Close error, get conn should return an error") |
| | | } |
| | | |
| | | if p.Len() != 0 { |
| | | t.Errorf("Close error used capacity. Expecting 0, got %d", p.Len()) |
| | | } |
| | | } |
| | | |
| | | func TestPoolConcurrent(t *testing.T) { |
| | | p, _ := newChannelPool() |
| | | pipe := make(chan *PoolConn, 0) |
| | | |
| | | go func() { |
| | | p.Close() |
| | | }() |
| | | |
| | | for i := 0; i < MaximumCap; i++ { |
| | | go func() { |
| | | conn, _ := p.Get() |
| | | |
| | | pipe <- conn |
| | | }() |
| | | |
| | | go func() { |
| | | conn := <-pipe |
| | | if conn == nil { |
| | | return |
| | | } |
| | | conn.Close() |
| | | }() |
| | | } |
| | | } |
| | | |
| | | func TestPoolConcurrent2(t *testing.T) { |
| | | p, _ := NewChannelPool(0, 30, factory) |
| | | |
| | | var wg sync.WaitGroup |
| | | |
| | | go func() { |
| | | for i := 0; i < 10; i++ { |
| | | wg.Add(1) |
| | | go func(i int) { |
| | | conn, _ := p.Get() |
| | | time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) |
| | | conn.Close() |
| | | wg.Done() |
| | | }(i) |
| | | } |
| | | }() |
| | | |
| | | for i := 0; i < 10; i++ { |
| | | wg.Add(1) |
| | | go func(i int) { |
| | | conn, _ := p.Get() |
| | | time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) |
| | | conn.Close() |
| | | wg.Done() |
| | | }(i) |
| | | } |
| | | |
| | | wg.Wait() |
| | | } |
New file |
| | |
| | | package nsqclient |
| | | |
| | | import ( |
| | | "sync" |
| | | |
| | | nsq "github.com/nsqio/go-nsq" |
| | | ) |
| | | |
| | | // PoolConn is a wrapper around net.Conn to modify the the behavior of |
| | | // net.Conn's Close() method. |
| | | type PoolConn struct { |
| | | *nsq.Producer |
| | | mu sync.RWMutex |
| | | c *channelPool |
| | | unusable bool |
| | | } |
| | | |
| | | // Close puts the given connects back to the pool instead of closing it. |
| | | func (p *PoolConn) Close() error { |
| | | p.mu.RLock() |
| | | defer p.mu.RUnlock() |
| | | |
| | | if p.unusable { |
| | | if p.Producer != nil { |
| | | p.Producer.Stop() |
| | | return nil |
| | | } |
| | | return nil |
| | | } |
| | | return p.c.put(p.Producer) |
| | | } |
| | | |
| | | // MarkUnusable marks the connection not usable any more, to let the pool close it instead of returning it to pool. |
| | | func (p *PoolConn) MarkUnusable() { |
| | | p.mu.Lock() |
| | | p.unusable = true |
| | | p.mu.Unlock() |
| | | } |
| | | |
| | | // newConn wraps a standard net.Conn to a poolConn net.Conn. |
| | | func (c *channelPool) wrapConn(conn *nsq.Producer) *PoolConn { |
| | | p := &PoolConn{c: c} |
| | | p.Producer = conn |
| | | return p |
| | | } |
New file |
| | |
| | | package nsqclient |
| | | |
| | | import ( |
| | | "context" |
| | | "fmt" |
| | | "time" |
| | | |
| | | nsq "github.com/nsqio/go-nsq" |
| | | ) |
| | | |
| | | type NsqConsumer struct { |
| | | consumer *nsq.Consumer |
| | | handler nsq.Handler |
| | | ctx context.Context |
| | | ctxCancel context.CancelFunc |
| | | topic string |
| | | channel string |
| | | } |
| | | |
| | | func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) { |
| | | conf := nsq.NewConfig() |
| | | conf.MaxAttempts = 0 |
| | | conf.MsgTimeout = 10 * time.Minute // 默认一个消息最多能处理十分钟,否则就会重新丢入队列 |
| | | conf.LookupdPollInterval = 3 * time.Second // 调整consumer的重连间隔时间为3秒 |
| | | for _, option := range options { |
| | | option(conf) |
| | | } |
| | | |
| | | consumer, err := nsq.NewConsumer(topic, channel, conf) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | return &NsqConsumer{ |
| | | consumer: consumer, |
| | | ctx: ctx, |
| | | topic: topic, |
| | | channel: channel, |
| | | }, nil |
| | | } |
| | | |
| | | func DestroyNsqConsumer(c *NsqConsumer) { |
| | | if c != nil { |
| | | if c.ctxCancel != nil { |
| | | c.ctxCancel() |
| | | } |
| | | } |
| | | } |
| | | |
| | | func (n *NsqConsumer) AddHandler(handler nsq.Handler) { |
| | | n.handler = handler |
| | | } |
| | | |
| | | func (n *NsqConsumer) Run(qaddr string, concurrency int) error { |
| | | return n.RunDistributed([]string{qaddr}, nil, concurrency) |
| | | } |
| | | |
| | | func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error { |
| | | return n.RunDistributed(nil, []string{lookupAddr}, concurrency) |
| | | } |
| | | |
| | | func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error { |
| | | n.consumer.ChangeMaxInFlight(concurrency) |
| | | n.consumer.AddConcurrentHandlers(n.handler, concurrency) |
| | | |
| | | var err error |
| | | if len(qAddr) > 0 { |
| | | err = n.consumer.ConnectToNSQDs(qAddr) |
| | | } else if len(lAddr) > 0 { |
| | | err = n.consumer.ConnectToNSQLookupds(lAddr) |
| | | } else { |
| | | err = fmt.Errorf("Addr Must NOT Empty") |
| | | } |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | if n.ctx == nil { |
| | | n.ctx, n.ctxCancel = context.WithCancel(context.Background()) |
| | | } |
| | | |
| | | for { |
| | | select { |
| | | case <-n.ctx.Done(): |
| | | fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel) |
| | | n.consumer.Stop() |
| | | fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel) |
| | | return nil |
| | | } |
| | | } |
| | | } |
New file |
| | |
| | | package nsqclient |
| | | |
| | | // #include <stdlib.h> |
| | | import "C" |
| | | import ( |
| | | "sync" |
| | | "unsafe" |
| | | ) |
| | | |
| | | var ( |
| | | mutex sync.RWMutex |
| | | store = map[unsafe.Pointer]interface{}{} |
| | | ) |
| | | |
| | | func Save(v interface{}) unsafe.Pointer { |
| | | if v == nil { |
| | | return nil |
| | | } |
| | | |
| | | // Generate real fake C pointer. |
| | | // This pointer will not store any data, but will bi used for indexing purposes. |
| | | // Since Go doest allow to cast dangling pointer to unsafe.Pointer, we do rally allocate one byte. |
| | | // Why we need indexing, because Go doest allow C code to store pointers to Go data. |
| | | var ptr unsafe.Pointer = C.malloc(C.size_t(1)) |
| | | if ptr == nil { |
| | | panic("can't allocate 'cgo-pointer hack index pointer': ptr == nil") |
| | | } |
| | | |
| | | mutex.Lock() |
| | | store[ptr] = v |
| | | mutex.Unlock() |
| | | |
| | | return ptr |
| | | } |
| | | |
| | | func Restore(ptr unsafe.Pointer) (v interface{}) { |
| | | if ptr == nil { |
| | | return nil |
| | | } |
| | | |
| | | mutex.RLock() |
| | | v = store[ptr] |
| | | mutex.RUnlock() |
| | | return |
| | | } |
| | | |
| | | func Unref(ptr unsafe.Pointer) { |
| | | if ptr == nil { |
| | | return |
| | | } |
| | | |
| | | mutex.Lock() |
| | | delete(store, ptr) |
| | | mutex.Unlock() |
| | | |
| | | C.free(ptr) |
| | | } |
New file |
| | |
| | | // Package pool implements a pool of net.Conn interfaces to manage and reuse them. |
| | | package nsqclient |
| | | |
| | | import "errors" |
| | | |
| | | var ( |
| | | // ErrClosed is the error resulting if the pool is closed via pool.Close(). |
| | | ErrClosed = errors.New("pool is closed") |
| | | ) |
| | | |
| | | // Pool interface describes a pool implementation. A pool should have maximum |
| | | // capacity. An ideal pool is threadsafe and easy to use. |
| | | type Pool interface { |
| | | // Get returns a new connection from the pool. Closing the connections puts |
| | | // it back to the Pool. Closing it when the pool is destroyed or full will |
| | | // be counted as an error. |
| | | Get() (*PoolConn, error) |
| | | |
| | | // Close closes the pool and all its connections. After Close() the pool is |
| | | // no longer usable. |
| | | Close() |
| | | |
| | | // Len returns the current number of connections of the pool. |
| | | Len() int |
| | | } |
New file |
| | |
| | | package nsqclient |
| | | |
| | | import ( |
| | | "fmt" |
| | | "time" |
| | | |
| | | nsq "github.com/nsqio/go-nsq" |
| | | ) |
| | | |
| | | type Producer interface { |
| | | Publish(topic string, body []byte) error |
| | | MultiPublish(topic string, body [][]byte) error |
| | | DeferredPublish(topic string, delay time.Duration, body []byte) error |
| | | } |
| | | |
| | | var _ Producer = (*producer)(nil) |
| | | |
| | | type producer struct { |
| | | pool Pool |
| | | } |
| | | |
| | | var ( |
| | | // name pool producer |
| | | nsqList = make(map[string]Pool) |
| | | ) |
| | | |
| | | type Config struct { |
| | | Addr string `toml:"addr" json:"addr"` |
| | | InitSize int `toml:"init_size" json:"init_size"` |
| | | MaxSize int `toml:"max_size" json:"max_size"` |
| | | } |
| | | |
| | | func CreateProducerPool(configs map[string]Config) { |
| | | for name, conf := range configs { |
| | | n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize) |
| | | if err == nil { |
| | | nsqList[name] = n |
| | | // 支持ip:port寻址 |
| | | nsqList[conf.Addr] = n |
| | | } |
| | | } |
| | | } |
| | | |
| | | func DestroyProducerPool() { |
| | | for _, p := range nsqList { |
| | | p.Close() |
| | | } |
| | | } |
| | | |
| | | func GetProducer(key ...string) (*producer, error) { |
| | | k := "default" |
| | | if len(key) > 0 { |
| | | k = key[0] |
| | | } |
| | | if n, ok := nsqList[k]; ok { |
| | | return &producer{n}, nil |
| | | } |
| | | return nil, fmt.Errorf("GetProducer can't get producer") |
| | | } |
| | | |
| | | // CreateNSQProducer create nsq producer |
| | | func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) { |
| | | cfg := nsq.NewConfig() |
| | | for _, option := range options { |
| | | option(cfg) |
| | | } |
| | | |
| | | producer, err := nsq.NewProducer(addr, cfg) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | // producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError) |
| | | return producer, nil |
| | | } |
| | | |
| | | // CreateNSQProducerPool create a nwq producer pool |
| | | func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) { |
| | | factory := func() (*nsq.Producer, error) { |
| | | // TODO 这里应该执行ping方法来确定连接是正常的否则不应该创建conn |
| | | return newProducer(addr, options...) |
| | | } |
| | | nsqPool, err := NewChannelPool(initSize, maxSize, factory) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | return nsqPool, nil |
| | | } |
| | | |
| | | func NewProducer(addr string) (*producer, error) { |
| | | CreateProducerPool(map[string]Config{"default": {addr, 1, 1}}) |
| | | return GetProducer() |
| | | } |
| | | |
| | | func retry(num int, fn func() error) error { |
| | | var err error |
| | | for i := 0; i < num; i++ { |
| | | err = fn() |
| | | if err == nil { |
| | | break |
| | | } |
| | | } |
| | | return err |
| | | } |
| | | |
| | | func (p *producer) Publish(topic string, body []byte) error { |
| | | nsq, err := p.pool.Get() |
| | | if err != nil { |
| | | return err |
| | | } |
| | | defer nsq.Close() |
| | | |
| | | return retry(2, func() error { |
| | | return nsq.Publish(topic, body) |
| | | }) |
| | | } |
| | | |
| | | func (p *producer) MultiPublish(topic string, body [][]byte) error { |
| | | nsq, err := p.pool.Get() |
| | | if err != nil { |
| | | return err |
| | | } |
| | | defer nsq.Close() |
| | | |
| | | return retry(2, func() error { |
| | | return nsq.MultiPublish(topic, body) |
| | | }) |
| | | } |
| | | |
| | | func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error { |
| | | nsq, err := p.pool.Get() |
| | | if err != nil { |
| | | return err |
| | | } |
| | | defer nsq.Close() |
| | | |
| | | return retry(2, func() error { |
| | | return nsq.DeferredPublish(topic, delay, body) |
| | | }) |
| | | } |