From e8c97695dd6930465e66b8fac819301f03624512 Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期二, 01 八月 2023 11:19:37 +0800 Subject: [PATCH] 完善连接过程. 添加重连 --- collector/plc4x.go | 16 ++-- config.json | 5 + msg/send.go | 15 +++++ .idea/vcs.xml | 6 ++ .gitignore | 1 msg/msg.go | 8 ++ main.go | 12 +++ collector/collector.go | 89 ++++++++++++++++++++++------- nsqclient/client.go | 4 9 files changed, 122 insertions(+), 34 deletions(-) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7f92053 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +plc-recorder.exe diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project version="4"> + <component name="VcsDirectoryMappings"> + <mapping directory="$PROJECT_DIR$" vcs="Git" /> + </component> +</project> \ No newline at end of file diff --git a/collector/collector.go b/collector/collector.go index dafcb04..e70421d 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -7,6 +7,8 @@ "plc-recorder/logger" "plc-recorder/msg" + + plc4go "github.com/apache/plc4x/plc4go/pkg/api" ) var mapTask sync.Map @@ -19,7 +21,7 @@ func InitTask() { device := msg.PLCDevice{ Id: "0", - Ip: "192.168.20.188", + Ip: "192.168.1.188", Address: []int{17021}, Interval: 1, } @@ -32,32 +34,73 @@ mapTask.Store(device.Id, &proc) - go runCollectionTask(ctx, &device) + go connectingDevice(ctx, &device) } -func runCollectionTask(ctx context.Context, device *msg.PLCDevice) { - // 鍒涘缓modbusTCP杩炴帴, 寰幆鏌ヨ鏁版嵁骞朵笂鎶� +func connectingDevice(ctx context.Context, dev *msg.PLCDevice) { + plcResponse := msg.PLCResponse{ + Id: dev.Id, + Name: dev.Name, + Ip: dev.Ip, + Online: false, + } + for { - err, plcConnection := NewModbusConnection(device.Ip) - if err != nil { - logger.Warn("Error connecting to PLC: %s, ip:", device.Name, device.Ip) - - time.Sleep(30 * time.Second) - continue - } - - for { - select { - case <-ctx.Done(): - logger.Warn("Plc device %s, ip: %s, end of collection.", device.Name, device.Ip) - plcConnection.Close() - return - default: - - // 鏍规嵁璁剧疆鐨勫湴鍧�鏌ヨ鏁版嵁锛屼笂鎶� - // 鏆傚仠 - time.Sleep(time.Duration(device.Interval) * time.Second) + select { + case <-ctx.Done(): + logger.Warn("plc device %s, ip: %s, end of connecting.", dev.Name, dev.Ip) + return + default: + plcConnection, err := NewModbusConnection(dev.Ip) + if err != nil { + logger.Warn("error connecting to PLC: %s, ip: %s", dev.Name, dev.Ip) + plcResponse.Online = false + msg.SendDeviceLiveData(&plcResponse) + time.Sleep(30 * time.Second) + } else { + // 杩炴帴鎴愬姛鍚�, 寮�濮嬮噰闆嗘暟鎹�, 浼氬垽鏂繛鎺ユ槸鍚︽湁鏁�, 鏂紑鍚庝細閲囬泦浠诲姟浼氶��鍑�, 缁х画閲嶆柊灏濊瘯杩炴帴璁惧 + runCollectionTask(ctx, dev, plcConnection) } } } } + +func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) { + // 鍒涘缓modbusTCP杩炴帴, 寰幆鏌ヨ鏁版嵁骞朵笂鎶� + plcResponse := msg.PLCResponse{ + Id: dev.Id, + Name: dev.Name, + Ip: dev.Ip, + Online: true, + Data: nil, + } + + for { + select { + case <-ctx.Done(): + logger.Warn("plc device %s, ip: %s, end of collection.", dev.Name, dev.Ip) + conn.Close() + return + default: + if !conn.IsConnected() { + logger.Warn("plc device %s, ip: %s, disconnected.", dev.Name, dev.Ip) + return + } + + // 鏍规嵁璁剧疆鐨勫湴鍧�鏌ヨ鏁版嵁锛屼笂鎶� + plcResponse.Data = make(map[int][]byte, 0) + for _, addr := range dev.Address { + result, err := ReadHoldingRegister(conn, addr) + if err != nil { + logger.Warn("plc device Read Holding Register error, %s", err.Error()) + } else { + plcResponse.Data[addr] = result + } + } + + msg.SendDeviceLiveData(&plcResponse) + // 闂撮殧鏃堕棿 + time.Sleep(time.Duration(dev.Interval) * time.Second) + } + } +} diff --git a/collector/plc4x.go b/collector/plc4x.go index e35edfb..000b75c 100644 --- a/collector/plc4x.go +++ b/collector/plc4x.go @@ -9,7 +9,7 @@ "github.com/apache/plc4x/plc4go/pkg/api/transports" ) -func NewModbusConnection(ipAddr string) (error, plc4go.PlcConnection) { +func NewModbusConnection(ipAddr string) (plc4go.PlcConnection, error) { // 鍒涘缓椹卞姩绠$悊鍣� driverManager := plc4go.NewPlcDriverManager() @@ -29,37 +29,37 @@ // 鍒ゆ柇鏄惁杩炴帴鎴愬姛 if err := connectionResult.GetErr(); err != nil { - return err, nil + return nil, err } - return nil, connectionResult.GetConnection() + return connectionResult.GetConnection(), nil } -func ReadHoldingRegister(connection plc4go.PlcConnection, address int) (error, []byte) { +func ReadHoldingRegister(connection plc4go.PlcConnection, address int) ([]byte, error) { tagAddress := fmt.Sprintf("holding-register:%d:DINT", address) // 璇绘ā寮� readRequest, err := connection.ReadRequestBuilder().AddTagAddress("tag", tagAddress).Build() if err != nil { fmt.Printf("Error preparing read-request:%s\n", err.Error()) - return err, nil + return nil, err } // 鎵ц readResult := <-readRequest.Execute() if err := readResult.GetErr(); err != nil { fmt.Printf("Error execting read-request:%s\n", err.Error()) - return err, nil + return nil, err } // 鍒ゆ柇鍝嶅簲鐮佹槸鍚︽纭� if readResult.GetResponse().GetResponseCode("tag") != apiModel.PlcResponseCode_OK { fmt.Printf("error an non-ok return code: %s", readResult.GetResponse().GetResponseCode("tag").GetName()) - return err, nil + return nil, err } value := readResult.GetResponse().GetValue("tag") - return nil, value.GetRaw() + return value.GetRaw(), err } diff --git a/config.json b/config.json new file mode 100644 index 0000000..9b57b67 --- /dev/null +++ b/config.json @@ -0,0 +1,5 @@ +{ + "nsq_server": "fai365.com:4150", + "plc_data_topic": "aps.factory.plc.livedata", + "plc_set_topic": "" +} \ No newline at end of file diff --git a/main.go b/main.go index fea16d5..f77fc7a 100644 --- a/main.go +++ b/main.go @@ -1,12 +1,17 @@ package main import ( + "runtime" + + "plc-recorder/collector" "plc-recorder/config" "plc-recorder/logger" "plc-recorder/nsqclient" "github.com/rs/zerolog" ) + +var logFilePath = "./log/plc-recorder.log" func main() { // 鍒濆鍖栭厤缃� @@ -16,11 +21,16 @@ zerolog.SetGlobalLevel(zerolog.FatalLevel) // 鍒濆鍖栨棩蹇� - logger.InitLog("./log/plc-recorder.log", "debug", 15, false) + if runtime.GOOS == "windows" { + logFilePath = "log/plc-recorder.log" + } + logger.InitLog(logFilePath, "debug", 15, false) logger.Info("plc-recorde start!") // 鍒濆鍖杗sq nsqclient.InitNsqProducer() + collector.InitTask() + select {} } diff --git a/msg/msg.go b/msg/msg.go index 0a68244..2b53b75 100644 --- a/msg/msg.go +++ b/msg/msg.go @@ -7,3 +7,11 @@ Address []int // 鏁版嵁鍦板潃 Interval int // 閲囬泦鐨勬椂闂撮棿闅�. 绉� } + +type PLCResponse struct { + Id string + Name string + Ip string + Online bool + Data map[int][]byte +} diff --git a/msg/send.go b/msg/send.go new file mode 100644 index 0000000..e7036f1 --- /dev/null +++ b/msg/send.go @@ -0,0 +1,15 @@ +package msg + +import ( + "encoding/json" + "plc-recorder/config" + "plc-recorder/logger" + "plc-recorder/nsqclient" +) + +func SendDeviceLiveData(response *PLCResponse) { + logger.Debug("plc live data: %+v", response) + b, _ := json.Marshal(response) + + nsqclient.Produce(config.Options.PLCDataTopic, b) +} diff --git a/nsqclient/client.go b/nsqclient/client.go index 4bb7944..52eb3b3 100644 --- a/nsqclient/client.go +++ b/nsqclient/client.go @@ -3,8 +3,8 @@ import ( "context" - "kingdee-dbapi/config" - "kingdee-dbapi/logger" + "plc-recorder/config" + "plc-recorder/logger" ) var producerCli Producer -- Gitblit v1.8.0