zhangzengfei
2023-08-04 88da1e13a073e8b5656387a246d827593fbd6163
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package collector
 
import (
    "context"
    "sync"
    "time"
 
    "plc-recorder/logger"
    "plc-recorder/msg"
 
    plc4go "github.com/apache/plc4x/plc4go/pkg/api"
)
 
var mapTask sync.Map
 
type collectorProc struct {
    device *msg.PLCDevice
    cancel context.CancelFunc
}
 
func InitTask() {
    //device := msg.PLCDevice{
    //    DeviceID:   "0",
    //    DeviceName: "test",
    //    DeviceIP:   "192.168.1.188",
    //    Brand:      "sim",
    //    Method:     "modbus",
    //    PortName:   "",
    //    Frequency:  1,
    //    Details: []*msg.PLCAddress{&msg.PLCAddress{
    //        StartAddress: 17021,
    //        Length:       1,
    //        Type:         "int",
    //        FieldName:    "count",
    //    }},
    //}
 
    devices, err := getDeviceList()
    if err != nil {
        return
    }
 
    for idx, dev := range devices {
        // 判断设备状态, 如果配置未开启采集数据或其他方式, 现在假设字段status为0时代表采集
        if dev.Status != 0 {
            continue
        }
 
        addTask(&devices[idx])
    }
}
 
func addTask(device *msg.PLCDevice) {
    ctx, cancel := context.WithCancel(context.Background())
    proc := collectorProc{
        device: device,
        cancel: cancel,
    }
 
    mapTask.Store(device.DeviceID, &proc)
 
    go connectingDevice(ctx, device)
}
 
func connectingDevice(ctx context.Context, dev *msg.PLCDevice) {
    plcResponse := msg.PLCResponse{
        DeviceID:   dev.DeviceID,
        DeviceName: dev.DeviceName,
        DeviceIP:   dev.DeviceIP,
        Online:     false,
    }
 
    for {
        select {
        case <-ctx.Done():
            logger.Warn("plc device %s, ip: %s, end of connecting.", dev.DeviceName, dev.DeviceIP)
            return
        default:
            plcConnection, err := NewModbusConnection(dev.DeviceIP)
            if err != nil {
                logger.Warn("error connecting to PLC: %s, ip: %s", dev.DeviceName, dev.DeviceIP)
                plcResponse.Online = false
                msg.SendDeviceLiveData(&plcResponse)
                time.Sleep(30 * time.Second)
            } else {
                // 连接成功后, 开始采集数据, 会判断连接是否有效, 断开后会采集任务会退出, 继续重新尝试连接设备
                runCollectionTask(ctx, dev, plcConnection)
            }
        }
    }
}
 
func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) {
    // 创建modbusTCP连接, 循环查询数据并上报
    plcResponse := msg.PLCResponse{
        DeviceID:   dev.DeviceID,
        DeviceName: dev.DeviceName,
        DeviceIP:   dev.DeviceIP,
        Online:     true,
    }
 
    for {
        select {
        case <-ctx.Done():
            logger.Warn("plc device %s, ip: %s, end of collection.", dev.DeviceName, dev.DeviceIP)
            conn.Close()
            return
        default:
            if !conn.IsConnected() {
                logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP)
                return
            }
 
            // 根据设置的地址查询数据,上报
            plcResponse.Message = ""
            for _, addr := range dev.Details {
                result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length)
                if err != nil {
                    logger.Warn("plc device Read Holding Register error, %s", err.Error())
                    plcResponse.Message = err.Error()
                } else {
                    plcResponse.PLCData = append(plcResponse.PLCData, msg.PLCData{
                        StartAddress: addr.StartAddress,
                        Length:       addr.Length,
                        Type:         addr.Type,
                        FieldName:    addr.FieldName,
                        Data:         result,
                    })
                }
            }
 
            msg.SendDeviceLiveData(&plcResponse)
            if plcResponse.Message != "" {
                return
            }
 
            // 间隔时间
            time.Sleep(time.Duration(dev.Frequency) * time.Second)
        }
    }
}