package nsqclient import ( "fmt" "time" nsq "github.com/nsqio/go-nsq" ) type Producer interface { Publish(topic string, body []byte) error MultiPublish(topic string, body [][]byte) error DeferredPublish(topic string, delay time.Duration, body []byte) error } var _ Producer = (*producer)(nil) type producer struct { pool Pool } var ( // name pool producer nsqList = make(map[string]Pool) ) type Config struct { Addr string `toml:"addr" json:"addr"` InitSize int `toml:"init_size" json:"init_size"` MaxSize int `toml:"max_size" json:"max_size"` } func CreateProducerPool(configs map[string]Config) { for name, conf := range configs { n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize) if err == nil { nsqList[name] = n // 支持ip:port寻址 nsqList[conf.Addr] = n } } } func DestroyProducerPool() { for _, p := range nsqList { p.Close() } } func GetProducer(key ...string) (*producer, error) { k := "default" if len(key) > 0 { k = key[0] } if n, ok := nsqList[k]; ok { return &producer{n}, nil } return nil, fmt.Errorf("GetProducer can't get producer") } // CreateNSQProducer create nsq producer func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) { cfg := nsq.NewConfig() for _, option := range options { option(cfg) } producer, err := nsq.NewProducer(addr, cfg) if err != nil { return nil, err } // producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError) return producer, nil } // CreateNSQProducerPool create a nwq producer pool func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) { factory := func() (*nsq.Producer, error) { // TODO 这里应该执行ping方法来确定连接是正常的否则不应该创建conn return newProducer(addr, options...) } nsqPool, err := NewChannelPool(initSize, maxSize, factory) if err != nil { return nil, err } return nsqPool, nil } func NewProducer(addr string) (*producer, error) { CreateProducerPool(map[string]Config{"default": {addr, 1, 1}}) return GetProducer() } func retry(num int, fn func() error) error { var err error for i := 0; i < num; i++ { err = fn() if err == nil { break } } return err } func (p *producer) Publish(topic string, body []byte) error { nsq, err := p.pool.Get() if err != nil { return err } defer nsq.Close() return retry(2, func() error { return nsq.Publish(topic, body) }) } func (p *producer) MultiPublish(topic string, body [][]byte) error { nsq, err := p.pool.Get() if err != nil { return err } defer nsq.Close() return retry(2, func() error { return nsq.MultiPublish(topic, body) }) } func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error { nsq, err := p.pool.Get() if err != nil { return err } defer nsq.Close() return retry(2, func() error { return nsq.DeferredPublish(topic, delay, body) }) }