zhangzengfei
2023-08-11 bc0b7e914a378b2c40f9d2ec2470b61a19c18288
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package collector
 
import (
    "context"
    "sync"
    "time"
 
    "plc-recorder/logger"
    "plc-recorder/msg"
 
    plc4go "github.com/apache/plc4x/plc4go/pkg/api"
)
 
var tasksStore sync.Map
 
type collectorProc struct {
    device  *msg.PLCDevice
    cancel  context.CancelFunc
    plcConn *plc4go.PlcConnection
}
 
// 初始化采集任务, 请求设备列表, 按设备添加采集任务
func InitTask() {
    logger.Debug("init task")
    devices, err := getDeviceList()
    if err != nil {
        return
    }
 
    for idx, dev := range devices {
        // 判断设备状态, 如果配置未开启采集数据或其他方式, 现在假设字段status为0时代表采集
        if dev.Status != 0 {
            continue
        }
 
        logger.Debug("add collector task,device %s", dev.DeviceName)
        addTask(&devices[idx])
    }
}
 
func stopTask(device *msg.PLCDevice) {
    if task, ok := tasksStore.Load(device.DeviceID); ok {
        // 存在的任务, 先停止掉, 然后重新开启一个
        task.(collectorProc).cancel()
        tasksStore.Delete(device.DeviceID)
    }
}
 
func addTask(device *msg.PLCDevice) {
    ctx, cancel := context.WithCancel(context.Background())
    proc := collectorProc{
        device: device,
        cancel: cancel,
    }
 
    tasksStore.Store(device.DeviceID, &proc)
 
    go connectingDevice(ctx, &proc)
}
 
// 进入采集任务, 开始尝试连接设备, 失败后30秒重试, 成功后进入循环采集
func connectingDevice(ctx context.Context, proc *collectorProc) {
    for {
        select {
        case <-ctx.Done():
            logger.Warn("plc device %s, ip: %s, end of connecting.", proc.device.DeviceName, proc.device.DeviceIP)
            return
        default:
            plcConnection, err := NewModbusConnection(proc.device.DeviceIP)
            if err != nil {
                logger.Warn("error connecting to PLC: %s, ip: %s", proc.device.DeviceName, proc.device.DeviceIP)
 
                // 上报设备离线
                msg.SendDeviceLiveData(&msg.PLCResponse{
                    DeviceID:   proc.device.DeviceID,
                    DeviceName: proc.device.DeviceName,
                    DeviceIP:   proc.device.DeviceIP,
                    Online:     false,
                })
 
                time.Sleep(30 * time.Second)
            } else {
                // 连接成功后, 开始采集数据, 会判断连接是否有效, 断开后会采集任务会退出, 继续重新尝试连接设备
                proc.plcConn = &plcConnection
                runCollectionTask(ctx, proc)
            }
        }
    }
}
 
func runCollectionTask(ctx context.Context, proc *collectorProc) {
    dev := proc.device
 
    for {
        select {
        case <-ctx.Done():
            logger.Warn("plc device %s, ip: %s, end of collection.", dev.DeviceName, dev.DeviceIP)
            (*proc.plcConn).Close()
            return
        default:
            plcResponse := plc4xRequest(proc)
 
            msg.SendDeviceLiveData(plcResponse)
 
            // 无法连接了, 退出采集, 重新连接
            if !plcResponse.Online {
                return
            }
 
            // 间隔时间
            time.Sleep(time.Duration(dev.Frequency) * time.Second)
        }
    }
}
 
func plc4xRequest(proc *collectorProc) *msg.PLCResponse {
    conn := *proc.plcConn
    dev := proc.device
 
    plcResponse := msg.PLCResponse{
        DeviceID:   dev.DeviceID,
        DeviceName: dev.DeviceName,
        DeviceIP:   dev.DeviceIP,
        Online:     true,
    }
 
    if !conn.IsConnected() {
        logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP)
        plcResponse.Online = false
 
        return &plcResponse
    }
 
    // 根据设置的地址查询数据,上报
    for _, addr := range dev.Details {
        result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length)
        readRequest := msg.PLCData{
            StartAddress: addr.StartAddress,
            Length:       addr.Length,
            Type:         addr.Type,
            FieldName:    addr.FieldName,
            RawData:      result,
        }
 
        if err != nil {
            logger.Warn("plc device Read Holding Register error, %s", err.Error())
            readRequest.Message = err.Error()
        }
 
        plcResponse.PLCData = append(plcResponse.PLCData, readRequest)
    }
 
    return &plcResponse
}