package mqtt import ( "fmt" MQTT "github.com/eclipse/paho.mqtt.golang" ) type MQTTClient struct { Broker string ClientId string Username string Passwd string Mqtt MQTT.Client } var Client *MQTTClient func init() { Client = &MQTTClient{} } type mqLog struct { } func (l *mqLog) Println(v ...interface{}) { fmt.Println(v...) } func (l *mqLog) Printf(format string, v ...interface{}) { fmt.Println(fmt.Sprintf(format, v...)) } func (m *MQTTClient) Init(brokerAddr, clientId, username, password string) { m.Broker = brokerAddr m.ClientId = clientId m.Username = username m.Passwd = password opts := MQTT.NewClientOptions().AddBroker(brokerAddr) opts.SetClientID(clientId) opts.SetUsername(username) opts.SetPassword(password) opts.SetAutoReconnect(false) hl := mqLog{} MQTT.ERROR = &hl MQTT.CRITICAL= &hl MQTT.WARN = &hl MQTT.DEBUG = &hl m.Mqtt = MQTT.NewClient(opts) fmt.Println("connectRetry:", opts.ConnectRetry) if token := m.Mqtt.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) // fmt.Println(token.Error()) } } func (m *MQTTClient) Subscribe(topic string, sub chan<- MQTT.Message) { fmt.Println("start subscribing...") subToken := m.Mqtt.Subscribe( topic, 0, func(client MQTT.Client, msg MQTT.Message) { sub <- msg }) if subToken.Wait() && subToken.Error() != nil { fmt.Println(subToken.Error()) } } func (m *MQTTClient) Publish(topic string, msg interface{}) { if m.Mqtt.IsConnected() { token := m.Mqtt.Publish(topic, 0, false, msg) if token.Wait() && token.Error() != nil { fmt.Println(token.Error()) } } else { fmt.Println("mqtt client reconneted") if token := m.Mqtt.Connect(); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) } } }