zhangmeng
2023-05-16 1b4fc8c66026c77abf734126b4e7662e386ec0f5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package nsqcli
 
import (
    "fmt"
    "time"
 
    nsq "github.com/nsqio/go-nsq"
)
 
type Producer interface {
    Publish(topic string, body []byte) error
    MultiPublish(topic string, body [][]byte) error
    DeferredPublish(topic string, delay time.Duration, body []byte) error
}
 
var _ Producer = (*producer)(nil)
 
type producer struct {
    pool Pool
}
 
var (
    //                    name   pool producer
    nsqList = make(map[string]Pool)
)
 
type Config struct {
    Addr     string `toml:"addr" json:"addr"`
    InitSize int    `toml:"init_size" json:"init_size"`
    MaxSize  int    `toml:"max_size" json:"max_size"`
}
 
func CreateProducerPool(configs map[string]Config) {
    for name, conf := range configs {
        n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize)
        if err == nil {
            nsqList[name] = n
            // 支持ip:port寻址
            nsqList[conf.Addr] = n
        }
    }
}
 
func DestroyProducerPool() {
    for _, p := range nsqList {
        p.Close()
    }
}
 
func GetProducer(key ...string) (*producer, error) {
    k := "default"
    if len(key) > 0 {
        k = key[0]
    }
    if n, ok := nsqList[k]; ok {
        return &producer{n}, nil
    }
    return nil, fmt.Errorf("GetProducer can't get producer")
}
 
// CreateNSQProducer create nsq producer
func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) {
    cfg := nsq.NewConfig()
    for _, option := range options {
        option(cfg)
    }
 
    producer, err := nsq.NewProducer(addr, cfg)
    if err != nil {
        return nil, err
    }
    // producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError)
    return producer, nil
}
 
// CreateNSQProducerPool create a nwq producer pool
func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) {
    factory := func() (*nsq.Producer, error) {
        // TODO 这里应该执行ping方法来确定连接是正常的否则不应该创建conn
        return newProducer(addr, options...)
    }
    nsqPool, err := NewChannelPool(initSize, maxSize, factory)
    if err != nil {
        return nil, err
    }
    return nsqPool, nil
}
 
func NewProducer(addr string) (*producer, error) {
    CreateProducerPool(map[string]Config{"default": {addr, 1, 1}})
    return GetProducer()
}
 
func retry(num int, fn func() error) error {
    var err error
    for i := 0; i < num; i++ {
        err = fn()
        if err == nil {
            break
        }
    }
    return err
}
 
func (p *producer) Publish(topic string, body []byte) error {
    nsq, err := p.pool.Get()
    if err != nil {
        return err
    }
    defer nsq.Close()
 
    return retry(2, func() error {
        return nsq.Publish(topic, body)
    })
}
 
func (p *producer) MultiPublish(topic string, body [][]byte) error {
    nsq, err := p.pool.Get()
    if err != nil {
        return err
    }
    defer nsq.Close()
 
    return retry(2, func() error {
        return nsq.MultiPublish(topic, body)
    })
}
 
func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error {
    nsq, err := p.pool.Get()
    if err != nil {
        return err
    }
    defer nsq.Close()
 
    return retry(2, func() error {
        return nsq.DeferredPublish(topic, delay, body)
    })
}