From e8c97695dd6930465e66b8fac819301f03624512 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期二, 01 八月 2023 11:19:37 +0800
Subject: [PATCH] 完善连接过程. 添加重连
---
collector/plc4x.go | 16 ++--
config.json | 5 +
msg/send.go | 15 +++++
.idea/vcs.xml | 6 ++
.gitignore | 1
msg/msg.go | 8 ++
main.go | 12 +++
collector/collector.go | 89 ++++++++++++++++++++++-------
nsqclient/client.go | 4
9 files changed, 122 insertions(+), 34 deletions(-)
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..7f92053
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+plc-recorder.exe
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..94a25f7
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+ <component name="VcsDirectoryMappings">
+ <mapping directory="$PROJECT_DIR$" vcs="Git" />
+ </component>
+</project>
\ No newline at end of file
diff --git a/collector/collector.go b/collector/collector.go
index dafcb04..e70421d 100644
--- a/collector/collector.go
+++ b/collector/collector.go
@@ -7,6 +7,8 @@
"plc-recorder/logger"
"plc-recorder/msg"
+
+ plc4go "github.com/apache/plc4x/plc4go/pkg/api"
)
var mapTask sync.Map
@@ -19,7 +21,7 @@
func InitTask() {
device := msg.PLCDevice{
Id: "0",
- Ip: "192.168.20.188",
+ Ip: "192.168.1.188",
Address: []int{17021},
Interval: 1,
}
@@ -32,32 +34,73 @@
mapTask.Store(device.Id, &proc)
- go runCollectionTask(ctx, &device)
+ go connectingDevice(ctx, &device)
}
-func runCollectionTask(ctx context.Context, device *msg.PLCDevice) {
- // 鍒涘缓modbusTCP杩炴帴, 寰幆鏌ヨ鏁版嵁骞朵笂鎶�
+func connectingDevice(ctx context.Context, dev *msg.PLCDevice) {
+ plcResponse := msg.PLCResponse{
+ Id: dev.Id,
+ Name: dev.Name,
+ Ip: dev.Ip,
+ Online: false,
+ }
+
for {
- err, plcConnection := NewModbusConnection(device.Ip)
- if err != nil {
- logger.Warn("Error connecting to PLC: %s, ip:", device.Name, device.Ip)
-
- time.Sleep(30 * time.Second)
- continue
- }
-
- for {
- select {
- case <-ctx.Done():
- logger.Warn("Plc device %s, ip: %s, end of collection.", device.Name, device.Ip)
- plcConnection.Close()
- return
- default:
-
- // 鏍规嵁璁剧疆鐨勫湴鍧�鏌ヨ鏁版嵁锛屼笂鎶�
- // 鏆傚仠
- time.Sleep(time.Duration(device.Interval) * time.Second)
+ select {
+ case <-ctx.Done():
+ logger.Warn("plc device %s, ip: %s, end of connecting.", dev.Name, dev.Ip)
+ return
+ default:
+ plcConnection, err := NewModbusConnection(dev.Ip)
+ if err != nil {
+ logger.Warn("error connecting to PLC: %s, ip: %s", dev.Name, dev.Ip)
+ plcResponse.Online = false
+ msg.SendDeviceLiveData(&plcResponse)
+ time.Sleep(30 * time.Second)
+ } else {
+ // 杩炴帴鎴愬姛鍚�, 寮�濮嬮噰闆嗘暟鎹�, 浼氬垽鏂繛鎺ユ槸鍚︽湁鏁�, 鏂紑鍚庝細閲囬泦浠诲姟浼氶��鍑�, 缁х画閲嶆柊灏濊瘯杩炴帴璁惧
+ runCollectionTask(ctx, dev, plcConnection)
}
}
}
}
+
+func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) {
+ // 鍒涘缓modbusTCP杩炴帴, 寰幆鏌ヨ鏁版嵁骞朵笂鎶�
+ plcResponse := msg.PLCResponse{
+ Id: dev.Id,
+ Name: dev.Name,
+ Ip: dev.Ip,
+ Online: true,
+ Data: nil,
+ }
+
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Warn("plc device %s, ip: %s, end of collection.", dev.Name, dev.Ip)
+ conn.Close()
+ return
+ default:
+ if !conn.IsConnected() {
+ logger.Warn("plc device %s, ip: %s, disconnected.", dev.Name, dev.Ip)
+ return
+ }
+
+ // 鏍规嵁璁剧疆鐨勫湴鍧�鏌ヨ鏁版嵁锛屼笂鎶�
+ plcResponse.Data = make(map[int][]byte, 0)
+ for _, addr := range dev.Address {
+ result, err := ReadHoldingRegister(conn, addr)
+ if err != nil {
+ logger.Warn("plc device Read Holding Register error, %s", err.Error())
+ } else {
+ plcResponse.Data[addr] = result
+ }
+ }
+
+ msg.SendDeviceLiveData(&plcResponse)
+ // 闂撮殧鏃堕棿
+ time.Sleep(time.Duration(dev.Interval) * time.Second)
+ }
+ }
+}
diff --git a/collector/plc4x.go b/collector/plc4x.go
index e35edfb..000b75c 100644
--- a/collector/plc4x.go
+++ b/collector/plc4x.go
@@ -9,7 +9,7 @@
"github.com/apache/plc4x/plc4go/pkg/api/transports"
)
-func NewModbusConnection(ipAddr string) (error, plc4go.PlcConnection) {
+func NewModbusConnection(ipAddr string) (plc4go.PlcConnection, error) {
// 鍒涘缓椹卞姩绠$悊鍣�
driverManager := plc4go.NewPlcDriverManager()
@@ -29,37 +29,37 @@
// 鍒ゆ柇鏄惁杩炴帴鎴愬姛
if err := connectionResult.GetErr(); err != nil {
- return err, nil
+ return nil, err
}
- return nil, connectionResult.GetConnection()
+ return connectionResult.GetConnection(), nil
}
-func ReadHoldingRegister(connection plc4go.PlcConnection, address int) (error, []byte) {
+func ReadHoldingRegister(connection plc4go.PlcConnection, address int) ([]byte, error) {
tagAddress := fmt.Sprintf("holding-register:%d:DINT", address)
// 璇绘ā寮�
readRequest, err := connection.ReadRequestBuilder().AddTagAddress("tag", tagAddress).Build()
if err != nil {
fmt.Printf("Error preparing read-request:%s\n", err.Error())
- return err, nil
+ return nil, err
}
// 鎵ц
readResult := <-readRequest.Execute()
if err := readResult.GetErr(); err != nil {
fmt.Printf("Error execting read-request:%s\n", err.Error())
- return err, nil
+ return nil, err
}
// 鍒ゆ柇鍝嶅簲鐮佹槸鍚︽纭�
if readResult.GetResponse().GetResponseCode("tag") != apiModel.PlcResponseCode_OK {
fmt.Printf("error an non-ok return code: %s", readResult.GetResponse().GetResponseCode("tag").GetName())
- return err, nil
+ return nil, err
}
value := readResult.GetResponse().GetValue("tag")
- return nil, value.GetRaw()
+ return value.GetRaw(), err
}
diff --git a/config.json b/config.json
new file mode 100644
index 0000000..9b57b67
--- /dev/null
+++ b/config.json
@@ -0,0 +1,5 @@
+{
+ "nsq_server": "fai365.com:4150",
+ "plc_data_topic": "aps.factory.plc.livedata",
+ "plc_set_topic": ""
+}
\ No newline at end of file
diff --git a/main.go b/main.go
index fea16d5..f77fc7a 100644
--- a/main.go
+++ b/main.go
@@ -1,12 +1,17 @@
package main
import (
+ "runtime"
+
+ "plc-recorder/collector"
"plc-recorder/config"
"plc-recorder/logger"
"plc-recorder/nsqclient"
"github.com/rs/zerolog"
)
+
+var logFilePath = "./log/plc-recorder.log"
func main() {
// 鍒濆鍖栭厤缃�
@@ -16,11 +21,16 @@
zerolog.SetGlobalLevel(zerolog.FatalLevel)
// 鍒濆鍖栨棩蹇�
- logger.InitLog("./log/plc-recorder.log", "debug", 15, false)
+ if runtime.GOOS == "windows" {
+ logFilePath = "log/plc-recorder.log"
+ }
+ logger.InitLog(logFilePath, "debug", 15, false)
logger.Info("plc-recorde start!")
// 鍒濆鍖杗sq
nsqclient.InitNsqProducer()
+ collector.InitTask()
+
select {}
}
diff --git a/msg/msg.go b/msg/msg.go
index 0a68244..2b53b75 100644
--- a/msg/msg.go
+++ b/msg/msg.go
@@ -7,3 +7,11 @@
Address []int // 鏁版嵁鍦板潃
Interval int // 閲囬泦鐨勬椂闂撮棿闅�. 绉�
}
+
+type PLCResponse struct {
+ Id string
+ Name string
+ Ip string
+ Online bool
+ Data map[int][]byte
+}
diff --git a/msg/send.go b/msg/send.go
new file mode 100644
index 0000000..e7036f1
--- /dev/null
+++ b/msg/send.go
@@ -0,0 +1,15 @@
+package msg
+
+import (
+ "encoding/json"
+ "plc-recorder/config"
+ "plc-recorder/logger"
+ "plc-recorder/nsqclient"
+)
+
+func SendDeviceLiveData(response *PLCResponse) {
+ logger.Debug("plc live data: %+v", response)
+ b, _ := json.Marshal(response)
+
+ nsqclient.Produce(config.Options.PLCDataTopic, b)
+}
diff --git a/nsqclient/client.go b/nsqclient/client.go
index 4bb7944..52eb3b3 100644
--- a/nsqclient/client.go
+++ b/nsqclient/client.go
@@ -3,8 +3,8 @@
import (
"context"
- "kingdee-dbapi/config"
- "kingdee-dbapi/logger"
+ "plc-recorder/config"
+ "plc-recorder/logger"
)
var producerCli Producer
--
Gitblit v1.8.0