package collector
|
|
import (
|
"context"
|
"sync"
|
"time"
|
|
"plc-recorder/logger"
|
"plc-recorder/msg"
|
|
plc4go "github.com/apache/plc4x/plc4go/pkg/api"
|
)
|
|
var tasksStore sync.Map
|
|
type collectorProc struct {
|
device *msg.PLCDevice
|
cancel context.CancelFunc
|
plcConn *plc4go.PlcConnection
|
}
|
|
// 初始化采集任务, 请求设备列表, 按设备添加采集任务
|
func InitTask() {
|
logger.Debug("init task")
|
devices, err := getDeviceList()
|
if err != nil {
|
return
|
}
|
|
for idx, dev := range devices {
|
// 判断设备状态, 如果配置未开启采集数据或其他方式, 现在假设字段status为0时代表采集
|
if dev.Status != 0 {
|
continue
|
}
|
|
logger.Debug("add collector task,device %s", dev.DeviceName)
|
addTask(&devices[idx])
|
}
|
}
|
|
func stopTask(device *msg.PLCDevice) {
|
if task, ok := tasksStore.Load(device.DeviceID); ok {
|
// 存在的任务, 先停止掉, 然后重新开启一个
|
task.(collectorProc).cancel()
|
tasksStore.Delete(device.DeviceID)
|
}
|
}
|
|
func addTask(device *msg.PLCDevice) {
|
ctx, cancel := context.WithCancel(context.Background())
|
proc := collectorProc{
|
device: device,
|
cancel: cancel,
|
}
|
|
tasksStore.Store(device.DeviceID, &proc)
|
|
go connectingDevice(ctx, &proc)
|
}
|
|
// 进入采集任务, 开始尝试连接设备, 失败后30秒重试, 成功后进入循环采集
|
func connectingDevice(ctx context.Context, proc *collectorProc) {
|
for {
|
select {
|
case <-ctx.Done():
|
logger.Warn("plc device %s, ip: %s, end of connecting.", proc.device.DeviceName, proc.device.DeviceIP)
|
return
|
default:
|
plcConnection, err := NewModbusConnection(proc.device.DeviceIP)
|
if err != nil {
|
logger.Warn("error connecting to PLC: %s, ip: %s", proc.device.DeviceName, proc.device.DeviceIP)
|
|
// 上报设备离线
|
msg.SendDeviceLiveData(&msg.PLCResponse{
|
DeviceID: proc.device.DeviceID,
|
DeviceName: proc.device.DeviceName,
|
DeviceIP: proc.device.DeviceIP,
|
Online: false,
|
})
|
|
time.Sleep(30 * time.Second)
|
} else {
|
// 连接成功后, 开始采集数据, 会判断连接是否有效, 断开后会采集任务会退出, 继续重新尝试连接设备
|
proc.plcConn = &plcConnection
|
runCollectionTask(ctx, proc)
|
}
|
}
|
}
|
}
|
|
func runCollectionTask(ctx context.Context, proc *collectorProc) {
|
dev := proc.device
|
|
for {
|
select {
|
case <-ctx.Done():
|
logger.Warn("plc device %s, ip: %s, end of collection.", dev.DeviceName, dev.DeviceIP)
|
(*proc.plcConn).Close()
|
return
|
default:
|
plcResponse := plc4xRequest(proc)
|
|
msg.SendDeviceLiveData(plcResponse)
|
|
// 无法连接了, 退出采集, 重新连接
|
if !plcResponse.Online {
|
return
|
}
|
|
// 间隔时间
|
time.Sleep(time.Duration(dev.Frequency) * time.Second)
|
}
|
}
|
}
|
|
func plc4xRequest(proc *collectorProc) *msg.PLCResponse {
|
conn := *proc.plcConn
|
dev := proc.device
|
|
plcResponse := msg.PLCResponse{
|
DeviceID: dev.DeviceID,
|
DeviceName: dev.DeviceName,
|
DeviceIP: dev.DeviceIP,
|
Online: true,
|
}
|
|
if !conn.IsConnected() {
|
logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP)
|
plcResponse.Online = false
|
|
return &plcResponse
|
}
|
|
// 根据设置的地址查询数据,上报
|
for _, addr := range dev.Details {
|
result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length)
|
readRequest := msg.PLCData{
|
StartAddress: addr.StartAddress,
|
Length: addr.Length,
|
Type: addr.Type,
|
FieldName: addr.FieldName,
|
RawData: result,
|
}
|
|
if err != nil {
|
logger.Warn("plc device Read Holding Register error, %s", err.Error())
|
readRequest.Message = err.Error()
|
}
|
|
plcResponse.PLCData = append(plcResponse.PLCData, readRequest)
|
}
|
|
return &plcResponse
|
}
|