package mqtt
|
|
import (
|
"fmt"
|
|
MQTT "github.com/eclipse/paho.mqtt.golang"
|
)
|
|
type MQTTClient struct {
|
Broker string
|
ClientId string
|
Username string
|
Passwd string
|
Mqtt MQTT.Client
|
}
|
|
var Client *MQTTClient
|
|
func init() {
|
Client = &MQTTClient{}
|
}
|
|
type mqLog struct {
|
|
}
|
|
func (l *mqLog) Println(v ...interface{}) {
|
fmt.Println(v...)
|
}
|
|
func (l *mqLog) Printf(format string, v ...interface{}) {
|
fmt.Println(fmt.Sprintf(format, v...))
|
}
|
|
func (m *MQTTClient) Init(brokerAddr, clientId, username, password string) {
|
m.Broker = brokerAddr
|
m.ClientId = clientId
|
m.Username = username
|
m.Passwd = password
|
|
opts := MQTT.NewClientOptions().AddBroker(brokerAddr)
|
opts.SetClientID(clientId)
|
opts.SetUsername(username)
|
opts.SetPassword(password)
|
|
opts.SetAutoReconnect(false)
|
hl := mqLog{}
|
MQTT.ERROR = &hl
|
MQTT.CRITICAL= &hl
|
MQTT.WARN = &hl
|
MQTT.DEBUG = &hl
|
|
m.Mqtt = MQTT.NewClient(opts)
|
|
fmt.Println("connectRetry:", opts.ConnectRetry)
|
if token := m.Mqtt.Connect(); token.Wait() && token.Error() != nil {
|
panic(token.Error())
|
// fmt.Println(token.Error())
|
}
|
}
|
|
func (m *MQTTClient) Subscribe(topic string, sub chan<- MQTT.Message) {
|
fmt.Println("start subscribing...")
|
subToken := m.Mqtt.Subscribe(
|
topic,
|
0,
|
func(client MQTT.Client, msg MQTT.Message) {
|
sub <- msg
|
})
|
if subToken.Wait() && subToken.Error() != nil {
|
fmt.Println(subToken.Error())
|
}
|
}
|
|
func (m *MQTTClient) Publish(topic string, msg interface{}) {
|
if m.Mqtt.IsConnected() {
|
token := m.Mqtt.Publish(topic, 0, false, msg)
|
if token.Wait() && token.Error() != nil {
|
fmt.Println(token.Error())
|
}
|
} else {
|
fmt.Println("mqtt client reconneted")
|
if token := m.Mqtt.Connect(); token.Wait() && token.Error() != nil {
|
fmt.Println(token.Error())
|
}
|
}
|
}
|