| | |
| | | "apsClient/conf" |
| | | "apsClient/constvar" |
| | | "apsClient/pkg/logx" |
| | | "apsClient/pkg/nsqclient" |
| | | "apsClient/pkg/safe" |
| | | "basic.com/aps/nsqclient.git" |
| | | "context" |
| | | "errors" |
| | | "fmt" |
| | |
| | | } |
| | | |
| | | func (c *consumerManager) AddConsumer(topic string) { |
| | | client, err := NewConsumer(topic, conf.Conf.System.DeviceId) |
| | | if err != nil { |
| | | logx.Errorf("start nsq consume err: %v", err) |
| | | } |
| | | c.clients.Store(topic, client) |
| | | safe.Go(func() { |
| | | client, err := NewConsumer(topic, conf.Conf.System.DeviceId) |
| | | if err != nil { |
| | | logx.Errorf("start nsq consume err: %v", err) |
| | | } |
| | | c.clients.Store(topic, client) |
| | | if len(conf.Conf.NsqConf.NsqlookupdAddr) > 0 { |
| | | if err = client.RunLookupd(conf.Conf.NsqConf.NsqlookupdAddr, 1); err != nil { |
| | | logx.Errorf("RunLookupd err:%v", err) |
| | |
| | | return |
| | | } |
| | | } |
| | | logx.Infof("add consumer success, topic:%v", topic) |
| | | }) |
| | | logx.Infof("add consumer success, topic:%v", topic) |
| | | } |
| | | |
| | | func (c *consumerManager) stop() { |