| | |
| | | "apsClient/constvar" |
| | | "apsClient/pkg/logx" |
| | | "apsClient/pkg/nsqclient" |
| | | "context" |
| | | "fmt" |
| | | ) |
| | | |
| | | func Consume(topic, channel string) (err error) { |
| | | c, err := nsqclient.NewNsqConsumer(context.Background(), topic, channel) |
| | | func NewConsumer(topic, channel string) (c *nsqclient.NsqConsumer, err error) { |
| | | c, err = nsqclient.NewNsqConsumer(topic, channel) |
| | | if err != nil { |
| | | logx.Errorf("NewNsqConsumer err:%v", err) |
| | | return |
| | |
| | | handler = &ProcessParamsSync{Topic: topic} |
| | | case fmt.Sprintf(constvar.NsqTopicDeviceUpdate, conf.Conf.NsqConf.NodeId): |
| | | handler = &DeviceUpdate{Topic: topic} |
| | | case fmt.Sprintf(constvar.NsqTopicPullDataResponse, conf.Conf.NsqConf.NodeId): |
| | | handler = &PullDataResponse{Topic: topic} |
| | | } |
| | | c.AddHandler(handler.HandleMessage) |
| | | |
| | | if len(conf.Conf.NsqConf.NsqlookupdAddr) > 0 { |
| | | if err = c.RunLookupd(conf.Conf.NsqConf.NsqlookupdAddr, 1); err != nil { |
| | | logx.Errorf("RunLookupd err:%v", err) |
| | | return |
| | | } |
| | | } else { |
| | | if err = c.Run(conf.Conf.NsqConf.NsqdAddr, 1); err != nil { |
| | | logx.Errorf("Run err:%v", err) |
| | | return |
| | | } |
| | | } |
| | | return |
| | | } |