1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
| package collector
|
| import (
| "context"
| "sync"
| "time"
|
| "plc-recorder/logger"
| "plc-recorder/msg"
|
| plc4go "github.com/apache/plc4x/plc4go/pkg/api"
| )
|
| var mapTask sync.Map
|
| type collectorProc struct {
| device *msg.PLCDevice
| cancel context.CancelFunc
| }
|
| func InitTask() {
| //device := msg.PLCDevice{
| // DeviceID: "0",
| // DeviceName: "test",
| // DeviceIP: "192.168.1.188",
| // Brand: "sim",
| // Method: "modbus",
| // PortName: "",
| // Frequency: 1,
| // Details: []*msg.PLCAddress{&msg.PLCAddress{
| // StartAddress: 17021,
| // Length: 1,
| // Type: "int",
| // FieldName: "count",
| // }},
| //}
|
| devices, err := getDeviceList()
| if err != nil {
| return
| }
|
| for idx, dev := range devices {
| // 判断设备状态, 如果配置未开启采集数据或其他方式, 现在假设字段status为0时代表采集
| if dev.Status != 0 {
| continue
| }
|
| addTask(&devices[idx])
| }
| }
|
| func addTask(device *msg.PLCDevice) {
| ctx, cancel := context.WithCancel(context.Background())
| proc := collectorProc{
| device: device,
| cancel: cancel,
| }
|
| mapTask.Store(device.DeviceID, &proc)
|
| go connectingDevice(ctx, device)
| }
|
| func connectingDevice(ctx context.Context, dev *msg.PLCDevice) {
| plcResponse := msg.PLCResponse{
| DeviceID: dev.DeviceID,
| DeviceName: dev.DeviceName,
| DeviceIP: dev.DeviceIP,
| Online: false,
| }
|
| for {
| select {
| case <-ctx.Done():
| logger.Warn("plc device %s, ip: %s, end of connecting.", dev.DeviceName, dev.DeviceIP)
| return
| default:
| plcConnection, err := NewModbusConnection(dev.DeviceIP)
| if err != nil {
| logger.Warn("error connecting to PLC: %s, ip: %s", dev.DeviceName, dev.DeviceIP)
| plcResponse.Online = false
| msg.SendDeviceLiveData(&plcResponse)
| time.Sleep(30 * time.Second)
| } else {
| // 连接成功后, 开始采集数据, 会判断连接是否有效, 断开后会采集任务会退出, 继续重新尝试连接设备
| runCollectionTask(ctx, dev, plcConnection)
| }
| }
| }
| }
|
| func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) {
| // 创建modbusTCP连接, 循环查询数据并上报
| plcResponse := msg.PLCResponse{
| DeviceID: dev.DeviceID,
| DeviceName: dev.DeviceName,
| DeviceIP: dev.DeviceIP,
| Online: true,
| }
|
| for {
| select {
| case <-ctx.Done():
| logger.Warn("plc device %s, ip: %s, end of collection.", dev.DeviceName, dev.DeviceIP)
| conn.Close()
| return
| default:
| if !conn.IsConnected() {
| logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP)
| return
| }
|
| // 根据设置的地址查询数据,上报
| plcResponse.Message = ""
| for _, addr := range dev.Details {
| result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length)
| if err != nil {
| logger.Warn("plc device Read Holding Register error, %s", err.Error())
| plcResponse.Message = err.Error()
| } else {
| plcResponse.PLCData = append(plcResponse.PLCData, msg.PLCData{
| StartAddress: addr.StartAddress,
| Length: addr.Length,
| Type: addr.Type,
| FieldName: addr.FieldName,
| Data: result,
| })
| }
| }
|
| msg.SendDeviceLiveData(&plcResponse)
| if plcResponse.Message != "" {
| return
| }
|
| // 间隔时间
| time.Sleep(time.Duration(dev.Frequency) * time.Second)
| }
| }
| }
|
|