From 7799d5acb4a25526625b3e99c2f7fd71d1be39ff Mon Sep 17 00:00:00 2001
From: qixiaoning <jony.kee@outlook.com>
Date: 星期二, 26 八月 2025 08:47:58 +0800
Subject: [PATCH] mqtt引入
---
push-service/mqtt/mqtt.go | 87 +++++++++++++++++++++++++++++++++++++++++++
1 files changed, 87 insertions(+), 0 deletions(-)
diff --git a/push-service/mqtt/mqtt.go b/push-service/mqtt/mqtt.go
new file mode 100644
index 0000000..6341672
--- /dev/null
+++ b/push-service/mqtt/mqtt.go
@@ -0,0 +1,87 @@
+package mqtt
+
+import (
+ "fmt"
+
+ MQTT "github.com/eclipse/paho.mqtt.golang"
+)
+
+type MQTTClient struct {
+ Broker string
+ ClientId string
+ Username string
+ Passwd string
+ Mqtt MQTT.Client
+}
+
+var Client *MQTTClient
+
+func init() {
+ Client = &MQTTClient{}
+}
+
+type mqLog struct {
+
+}
+
+func (l *mqLog) Println(v ...interface{}) {
+ fmt.Println(v...)
+}
+
+func (l *mqLog) Printf(format string, v ...interface{}) {
+ fmt.Println(fmt.Sprintf(format, v...))
+}
+
+func (m *MQTTClient) Init(brokerAddr, clientId, username, password string) {
+ m.Broker = brokerAddr
+ m.ClientId = clientId
+ m.Username = username
+ m.Passwd = password
+
+ opts := MQTT.NewClientOptions().AddBroker(brokerAddr)
+ opts.SetClientID(clientId)
+ opts.SetUsername(username)
+ opts.SetPassword(password)
+
+ opts.SetAutoReconnect(false)
+ hl := mqLog{}
+ MQTT.ERROR = &hl
+ MQTT.CRITICAL= &hl
+ MQTT.WARN = &hl
+ MQTT.DEBUG = &hl
+
+ m.Mqtt = MQTT.NewClient(opts)
+
+fmt.Println("connectRetry:", opts.ConnectRetry)
+ if token := m.Mqtt.Connect(); token.Wait() && token.Error() != nil {
+ panic(token.Error())
+ // fmt.Println(token.Error())
+ }
+}
+
+func (m *MQTTClient) Subscribe(topic string, sub chan<- MQTT.Message) {
+ fmt.Println("start subscribing...")
+ subToken := m.Mqtt.Subscribe(
+ topic,
+ 0,
+ func(client MQTT.Client, msg MQTT.Message) {
+ sub <- msg
+ })
+ if subToken.Wait() && subToken.Error() != nil {
+ fmt.Println(subToken.Error())
+ }
+}
+
+func (m *MQTTClient) Publish(topic string, msg interface{}) {
+ if m.Mqtt.IsConnected() {
+ token := m.Mqtt.Publish(topic, 0, false, msg)
+ if token.Wait() && token.Error() != nil {
+ fmt.Println(token.Error())
+ }
+ } else {
+ fmt.Println("mqtt client reconneted")
+ if token := m.Mqtt.Connect(); token.Wait() && token.Error() != nil {
+ fmt.Println(token.Error())
+ }
+ }
+}
--
Gitblit v1.8.0