From e8c97695dd6930465e66b8fac819301f03624512 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期二, 01 八月 2023 11:19:37 +0800
Subject: [PATCH] 完善连接过程. 添加重连

---
 collector/plc4x.go     |   16 ++--
 config.json            |    5 +
 msg/send.go            |   15 +++++
 .idea/vcs.xml          |    6 ++
 .gitignore             |    1 
 msg/msg.go             |    8 ++
 main.go                |   12 +++
 collector/collector.go |   89 ++++++++++++++++++++++-------
 nsqclient/client.go    |    4 
 9 files changed, 122 insertions(+), 34 deletions(-)

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..7f92053
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+plc-recorder.exe
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..94a25f7
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="VcsDirectoryMappings">
+    <mapping directory="$PROJECT_DIR$" vcs="Git" />
+  </component>
+</project>
\ No newline at end of file
diff --git a/collector/collector.go b/collector/collector.go
index dafcb04..e70421d 100644
--- a/collector/collector.go
+++ b/collector/collector.go
@@ -7,6 +7,8 @@
 
 	"plc-recorder/logger"
 	"plc-recorder/msg"
+
+	plc4go "github.com/apache/plc4x/plc4go/pkg/api"
 )
 
 var mapTask sync.Map
@@ -19,7 +21,7 @@
 func InitTask() {
 	device := msg.PLCDevice{
 		Id:       "0",
-		Ip:       "192.168.20.188",
+		Ip:       "192.168.1.188",
 		Address:  []int{17021},
 		Interval: 1,
 	}
@@ -32,32 +34,73 @@
 
 	mapTask.Store(device.Id, &proc)
 
-	go runCollectionTask(ctx, &device)
+	go connectingDevice(ctx, &device)
 }
 
-func runCollectionTask(ctx context.Context, device *msg.PLCDevice) {
-	// 鍒涘缓modbusTCP杩炴帴, 寰幆鏌ヨ鏁版嵁骞朵笂鎶�
+func connectingDevice(ctx context.Context, dev *msg.PLCDevice) {
+	plcResponse := msg.PLCResponse{
+		Id:     dev.Id,
+		Name:   dev.Name,
+		Ip:     dev.Ip,
+		Online: false,
+	}
+
 	for {
-		err, plcConnection := NewModbusConnection(device.Ip)
-		if err != nil {
-			logger.Warn("Error connecting to PLC: %s, ip:", device.Name, device.Ip)
-
-			time.Sleep(30 * time.Second)
-			continue
-		}
-
-		for {
-			select {
-			case <-ctx.Done():
-				logger.Warn("Plc device %s, ip: %s, end of collection.", device.Name, device.Ip)
-				plcConnection.Close()
-				return
-			default:
-
-				// 鏍规嵁璁剧疆鐨勫湴鍧�鏌ヨ鏁版嵁锛屼笂鎶�
-				// 鏆傚仠
-				time.Sleep(time.Duration(device.Interval) * time.Second)
+		select {
+		case <-ctx.Done():
+			logger.Warn("plc device %s, ip: %s, end of connecting.", dev.Name, dev.Ip)
+			return
+		default:
+			plcConnection, err := NewModbusConnection(dev.Ip)
+			if err != nil {
+				logger.Warn("error connecting to PLC: %s, ip: %s", dev.Name, dev.Ip)
+				plcResponse.Online = false
+				msg.SendDeviceLiveData(&plcResponse)
+				time.Sleep(30 * time.Second)
+			} else {
+				// 杩炴帴鎴愬姛鍚�, 寮�濮嬮噰闆嗘暟鎹�, 浼氬垽鏂繛鎺ユ槸鍚︽湁鏁�, 鏂紑鍚庝細閲囬泦浠诲姟浼氶��鍑�, 缁х画閲嶆柊灏濊瘯杩炴帴璁惧
+				runCollectionTask(ctx, dev, plcConnection)
 			}
 		}
 	}
 }
+
+func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) {
+	// 鍒涘缓modbusTCP杩炴帴, 寰幆鏌ヨ鏁版嵁骞朵笂鎶�
+	plcResponse := msg.PLCResponse{
+		Id:     dev.Id,
+		Name:   dev.Name,
+		Ip:     dev.Ip,
+		Online: true,
+		Data:   nil,
+	}
+
+	for {
+		select {
+		case <-ctx.Done():
+			logger.Warn("plc device %s, ip: %s, end of collection.", dev.Name, dev.Ip)
+			conn.Close()
+			return
+		default:
+			if !conn.IsConnected() {
+				logger.Warn("plc device %s, ip: %s, disconnected.", dev.Name, dev.Ip)
+				return
+			}
+
+			// 鏍规嵁璁剧疆鐨勫湴鍧�鏌ヨ鏁版嵁锛屼笂鎶�
+			plcResponse.Data = make(map[int][]byte, 0)
+			for _, addr := range dev.Address {
+				result, err := ReadHoldingRegister(conn, addr)
+				if err != nil {
+					logger.Warn("plc device Read Holding Register error, %s", err.Error())
+				} else {
+					plcResponse.Data[addr] = result
+				}
+			}
+
+			msg.SendDeviceLiveData(&plcResponse)
+			// 闂撮殧鏃堕棿
+			time.Sleep(time.Duration(dev.Interval) * time.Second)
+		}
+	}
+}
diff --git a/collector/plc4x.go b/collector/plc4x.go
index e35edfb..000b75c 100644
--- a/collector/plc4x.go
+++ b/collector/plc4x.go
@@ -9,7 +9,7 @@
 	"github.com/apache/plc4x/plc4go/pkg/api/transports"
 )
 
-func NewModbusConnection(ipAddr string) (error, plc4go.PlcConnection) {
+func NewModbusConnection(ipAddr string) (plc4go.PlcConnection, error) {
 	// 鍒涘缓椹卞姩绠$悊鍣�
 	driverManager := plc4go.NewPlcDriverManager()
 
@@ -29,37 +29,37 @@
 
 	// 鍒ゆ柇鏄惁杩炴帴鎴愬姛
 	if err := connectionResult.GetErr(); err != nil {
-		return err, nil
+		return nil, err
 	}
 
-	return nil, connectionResult.GetConnection()
+	return connectionResult.GetConnection(), nil
 }
 
-func ReadHoldingRegister(connection plc4go.PlcConnection, address int) (error, []byte) {
+func ReadHoldingRegister(connection plc4go.PlcConnection, address int) ([]byte, error) {
 	tagAddress := fmt.Sprintf("holding-register:%d:DINT", address)
 
 	// 璇绘ā寮�
 	readRequest, err := connection.ReadRequestBuilder().AddTagAddress("tag", tagAddress).Build()
 	if err != nil {
 		fmt.Printf("Error preparing read-request:%s\n", err.Error())
-		return err, nil
+		return nil, err
 	}
 
 	// 鎵ц
 	readResult := <-readRequest.Execute()
 	if err := readResult.GetErr(); err != nil {
 		fmt.Printf("Error execting read-request:%s\n", err.Error())
-		return err, nil
+		return nil, err
 	}
 
 	// 鍒ゆ柇鍝嶅簲鐮佹槸鍚︽纭�
 	if readResult.GetResponse().GetResponseCode("tag") != apiModel.PlcResponseCode_OK {
 		fmt.Printf("error an non-ok return code: %s", readResult.GetResponse().GetResponseCode("tag").GetName())
-		return err, nil
+		return nil, err
 	}
 
 	value := readResult.GetResponse().GetValue("tag")
 
-	return nil, value.GetRaw()
+	return value.GetRaw(), err
 
 }
diff --git a/config.json b/config.json
new file mode 100644
index 0000000..9b57b67
--- /dev/null
+++ b/config.json
@@ -0,0 +1,5 @@
+{
+    "nsq_server": "fai365.com:4150",
+    "plc_data_topic": "aps.factory.plc.livedata",
+    "plc_set_topic": ""
+}
\ No newline at end of file
diff --git a/main.go b/main.go
index fea16d5..f77fc7a 100644
--- a/main.go
+++ b/main.go
@@ -1,12 +1,17 @@
 package main
 
 import (
+	"runtime"
+
+	"plc-recorder/collector"
 	"plc-recorder/config"
 	"plc-recorder/logger"
 	"plc-recorder/nsqclient"
 
 	"github.com/rs/zerolog"
 )
+
+var logFilePath = "./log/plc-recorder.log"
 
 func main() {
 	// 鍒濆鍖栭厤缃�
@@ -16,11 +21,16 @@
 	zerolog.SetGlobalLevel(zerolog.FatalLevel)
 
 	// 鍒濆鍖栨棩蹇�
-	logger.InitLog("./log/plc-recorder.log", "debug", 15, false)
+	if runtime.GOOS == "windows" {
+		logFilePath = "log/plc-recorder.log"
+	}
+	logger.InitLog(logFilePath, "debug", 15, false)
 	logger.Info("plc-recorde start!")
 
 	// 鍒濆鍖杗sq
 	nsqclient.InitNsqProducer()
 
+	collector.InitTask()
+
 	select {}
 }
diff --git a/msg/msg.go b/msg/msg.go
index 0a68244..2b53b75 100644
--- a/msg/msg.go
+++ b/msg/msg.go
@@ -7,3 +7,11 @@
 	Address  []int // 鏁版嵁鍦板潃
 	Interval int   // 閲囬泦鐨勬椂闂撮棿闅�. 绉�
 }
+
+type PLCResponse struct {
+	Id     string
+	Name   string
+	Ip     string
+	Online bool
+	Data   map[int][]byte
+}
diff --git a/msg/send.go b/msg/send.go
new file mode 100644
index 0000000..e7036f1
--- /dev/null
+++ b/msg/send.go
@@ -0,0 +1,15 @@
+package msg
+
+import (
+	"encoding/json"
+	"plc-recorder/config"
+	"plc-recorder/logger"
+	"plc-recorder/nsqclient"
+)
+
+func SendDeviceLiveData(response *PLCResponse) {
+	logger.Debug("plc live data: %+v", response)
+	b, _ := json.Marshal(response)
+
+	nsqclient.Produce(config.Options.PLCDataTopic, b)
+}
diff --git a/nsqclient/client.go b/nsqclient/client.go
index 4bb7944..52eb3b3 100644
--- a/nsqclient/client.go
+++ b/nsqclient/client.go
@@ -3,8 +3,8 @@
 import (
 	"context"
 
-	"kingdee-dbapi/config"
-	"kingdee-dbapi/logger"
+	"plc-recorder/config"
+	"plc-recorder/logger"
 )
 
 var producerCli Producer

--
Gitblit v1.8.0