package nsqcli
|
|
import (
|
"fmt"
|
"time"
|
|
nsq "github.com/nsqio/go-nsq"
|
)
|
|
type Producer interface {
|
Publish(topic string, body []byte) error
|
MultiPublish(topic string, body [][]byte) error
|
DeferredPublish(topic string, delay time.Duration, body []byte) error
|
}
|
|
var _ Producer = (*producer)(nil)
|
|
type producer struct {
|
pool Pool
|
}
|
|
var (
|
// name pool producer
|
nsqList = make(map[string]Pool)
|
)
|
|
type Config struct {
|
Addr string `toml:"addr" json:"addr"`
|
InitSize int `toml:"init_size" json:"init_size"`
|
MaxSize int `toml:"max_size" json:"max_size"`
|
}
|
|
func CreateProducerPool(configs map[string]Config) {
|
for name, conf := range configs {
|
n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize)
|
if err == nil {
|
nsqList[name] = n
|
// 支持ip:port寻址
|
nsqList[conf.Addr] = n
|
}
|
}
|
}
|
|
func DestroyProducerPool() {
|
for _, p := range nsqList {
|
p.Close()
|
}
|
}
|
|
func GetProducer(key ...string) (*producer, error) {
|
k := "default"
|
if len(key) > 0 {
|
k = key[0]
|
}
|
if n, ok := nsqList[k]; ok {
|
return &producer{n}, nil
|
}
|
return nil, fmt.Errorf("GetProducer can't get producer")
|
}
|
|
// CreateNSQProducer create nsq producer
|
func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) {
|
cfg := nsq.NewConfig()
|
for _, option := range options {
|
option(cfg)
|
}
|
|
producer, err := nsq.NewProducer(addr, cfg)
|
if err != nil {
|
return nil, err
|
}
|
// producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError)
|
return producer, nil
|
}
|
|
// CreateNSQProducerPool create a nwq producer pool
|
func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) {
|
factory := func() (*nsq.Producer, error) {
|
// TODO 这里应该执行ping方法来确定连接是正常的否则不应该创建conn
|
return newProducer(addr, options...)
|
}
|
nsqPool, err := NewChannelPool(initSize, maxSize, factory)
|
if err != nil {
|
return nil, err
|
}
|
return nsqPool, nil
|
}
|
|
func NewProducer(addr string) (*producer, error) {
|
CreateProducerPool(map[string]Config{"default": {addr, 1, 1}})
|
return GetProducer()
|
}
|
|
func retry(num int, fn func() error) error {
|
var err error
|
for i := 0; i < num; i++ {
|
err = fn()
|
if err == nil {
|
break
|
}
|
}
|
return err
|
}
|
|
func (p *producer) Publish(topic string, body []byte) error {
|
nsq, err := p.pool.Get()
|
if err != nil {
|
return err
|
}
|
defer nsq.Close()
|
|
return retry(2, func() error {
|
return nsq.Publish(topic, body)
|
})
|
}
|
|
func (p *producer) MultiPublish(topic string, body [][]byte) error {
|
nsq, err := p.pool.Get()
|
if err != nil {
|
return err
|
}
|
defer nsq.Close()
|
|
return retry(2, func() error {
|
return nsq.MultiPublish(topic, body)
|
})
|
}
|
|
func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error {
|
nsq, err := p.pool.Get()
|
if err != nil {
|
return err
|
}
|
defer nsq.Close()
|
|
return retry(2, func() error {
|
return nsq.DeferredPublish(topic, delay, body)
|
})
|
}
|