From bc0b7e914a378b2c40f9d2ec2470b61a19c18288 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期五, 11 八月 2023 17:05:02 +0800
Subject: [PATCH] 修改上报的数据结构, 添加plc查询接口

---
 collector/collector.go |  117 ++++++++++++++++++++++++++++++++++------------------------
 1 files changed, 68 insertions(+), 49 deletions(-)

diff --git a/collector/collector.go b/collector/collector.go
index 12e8f2e..3fa392b 100644
--- a/collector/collector.go
+++ b/collector/collector.go
@@ -14,10 +14,12 @@
 var tasksStore sync.Map
 
 type collectorProc struct {
-	device *msg.PLCDevice
-	cancel context.CancelFunc
+	device  *msg.PLCDevice
+	cancel  context.CancelFunc
+	plcConn *plc4go.PlcConnection
 }
 
+// 鍒濆鍖栭噰闆嗕换鍔�, 璇锋眰璁惧鍒楄〃, 鎸夎澶囨坊鍔犻噰闆嗕换鍔�
 func InitTask() {
 	logger.Debug("init task")
 	devices, err := getDeviceList()
@@ -53,78 +55,55 @@
 
 	tasksStore.Store(device.DeviceID, &proc)
 
-	go connectingDevice(ctx, device)
+	go connectingDevice(ctx, &proc)
 }
 
-func connectingDevice(ctx context.Context, dev *msg.PLCDevice) {
-	plcResponse := msg.PLCResponse{
-		DeviceID:   dev.DeviceID,
-		DeviceName: dev.DeviceName,
-		DeviceIP:   dev.DeviceIP,
-		Online:     false,
-	}
-
+// 杩涘叆閲囬泦浠诲姟, 寮�濮嬪皾璇曡繛鎺ヨ澶�, 澶辫触鍚�30绉掗噸璇�, 鎴愬姛鍚庤繘鍏ュ惊鐜噰闆�
+func connectingDevice(ctx context.Context, proc *collectorProc) {
 	for {
 		select {
 		case <-ctx.Done():
-			logger.Warn("plc device %s, ip: %s, end of connecting.", dev.DeviceName, dev.DeviceIP)
+			logger.Warn("plc device %s, ip: %s, end of connecting.", proc.device.DeviceName, proc.device.DeviceIP)
 			return
 		default:
-			plcConnection, err := NewModbusConnection(dev.DeviceIP)
+			plcConnection, err := NewModbusConnection(proc.device.DeviceIP)
 			if err != nil {
-				logger.Warn("error connecting to PLC: %s, ip: %s", dev.DeviceName, dev.DeviceIP)
-				plcResponse.Online = false
-				msg.SendDeviceLiveData(&plcResponse)
+				logger.Warn("error connecting to PLC: %s, ip: %s", proc.device.DeviceName, proc.device.DeviceIP)
+
+				// 涓婃姤璁惧绂荤嚎
+				msg.SendDeviceLiveData(&msg.PLCResponse{
+					DeviceID:   proc.device.DeviceID,
+					DeviceName: proc.device.DeviceName,
+					DeviceIP:   proc.device.DeviceIP,
+					Online:     false,
+				})
+
 				time.Sleep(30 * time.Second)
 			} else {
 				// 杩炴帴鎴愬姛鍚�, 寮�濮嬮噰闆嗘暟鎹�, 浼氬垽鏂繛鎺ユ槸鍚︽湁鏁�, 鏂紑鍚庝細閲囬泦浠诲姟浼氶��鍑�, 缁х画閲嶆柊灏濊瘯杩炴帴璁惧
-				runCollectionTask(ctx, dev, plcConnection)
+				proc.plcConn = &plcConnection
+				runCollectionTask(ctx, proc)
 			}
 		}
 	}
 }
 
-func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) {
-	// 鍒涘缓modbusTCP杩炴帴, 寰幆鏌ヨ鏁版嵁骞朵笂鎶�
-	plcResponse := msg.PLCResponse{
-		DeviceID:   dev.DeviceID,
-		DeviceName: dev.DeviceName,
-		DeviceIP:   dev.DeviceIP,
-		Online:     true,
-	}
+func runCollectionTask(ctx context.Context, proc *collectorProc) {
+	dev := proc.device
 
 	for {
 		select {
 		case <-ctx.Done():
 			logger.Warn("plc device %s, ip: %s, end of collection.", dev.DeviceName, dev.DeviceIP)
-			conn.Close()
+			(*proc.plcConn).Close()
 			return
 		default:
-			if !conn.IsConnected() {
-				logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP)
-				return
-			}
+			plcResponse := plc4xRequest(proc)
 
-			// 鏍规嵁璁剧疆鐨勫湴鍧�鏌ヨ鏁版嵁锛屼笂鎶�
-			plcResponse.Message = ""
-			for _, addr := range dev.Details {
-				result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length)
-				if err != nil {
-					logger.Warn("plc device Read Holding Register error, %s", err.Error())
-					plcResponse.Message = err.Error()
-				} else {
-					plcResponse.PLCData = append(plcResponse.PLCData, msg.PLCData{
-						StartAddress: addr.StartAddress,
-						Length:       addr.Length,
-						Type:         addr.Type,
-						FieldName:    addr.FieldName,
-						RawData:      result,
-					})
-				}
-			}
+			msg.SendDeviceLiveData(plcResponse)
 
-			msg.SendDeviceLiveData(&plcResponse)
-			if plcResponse.Message != "" {
+			// 鏃犳硶杩炴帴浜�, 閫�鍑洪噰闆�, 閲嶆柊杩炴帴
+			if !plcResponse.Online {
 				return
 			}
 
@@ -133,3 +112,43 @@
 		}
 	}
 }
+
+func plc4xRequest(proc *collectorProc) *msg.PLCResponse {
+	conn := *proc.plcConn
+	dev := proc.device
+
+	plcResponse := msg.PLCResponse{
+		DeviceID:   dev.DeviceID,
+		DeviceName: dev.DeviceName,
+		DeviceIP:   dev.DeviceIP,
+		Online:     true,
+	}
+
+	if !conn.IsConnected() {
+		logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP)
+		plcResponse.Online = false
+
+		return &plcResponse
+	}
+
+	// 鏍规嵁璁剧疆鐨勫湴鍧�鏌ヨ鏁版嵁锛屼笂鎶�
+	for _, addr := range dev.Details {
+		result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length)
+		readRequest := msg.PLCData{
+			StartAddress: addr.StartAddress,
+			Length:       addr.Length,
+			Type:         addr.Type,
+			FieldName:    addr.FieldName,
+			RawData:      result,
+		}
+
+		if err != nil {
+			logger.Warn("plc device Read Holding Register error, %s", err.Error())
+			readRequest.Message = err.Error()
+		}
+
+		plcResponse.PLCData = append(plcResponse.PLCData, readRequest)
+	}
+
+	return &plcResponse
+}

--
Gitblit v1.8.0