package collector import ( "context" "sync" "time" "plc-recorder/logger" "plc-recorder/msg" plc4go "github.com/apache/plc4x/plc4go/pkg/api" ) var tasksStore sync.Map type collectorProc struct { device *msg.PLCDevice cancel context.CancelFunc plcConn *plc4go.PlcConnection } // 初始化采集任务, 请求设备列表, 按设备添加采集任务 func InitTask() { logger.Debug("init task") devices, err := getDeviceList() if err != nil { return } for idx, dev := range devices { // 判断设备状态, 如果配置未开启采集数据或其他方式, 现在假设字段status为0时代表采集 if dev.Status != 0 { continue } logger.Debug("add collector task,device %s", dev.DeviceName) addTask(&devices[idx]) } } func stopTask(device *msg.PLCDevice) { if task, ok := tasksStore.Load(device.DeviceID); ok { // 存在的任务, 先停止掉, 然后重新开启一个 task.(collectorProc).cancel() tasksStore.Delete(device.DeviceID) } } func addTask(device *msg.PLCDevice) { ctx, cancel := context.WithCancel(context.Background()) proc := collectorProc{ device: device, cancel: cancel, } tasksStore.Store(device.DeviceID, &proc) go connectingDevice(ctx, &proc) } // 进入采集任务, 开始尝试连接设备, 失败后30秒重试, 成功后进入循环采集 func connectingDevice(ctx context.Context, proc *collectorProc) { for { select { case <-ctx.Done(): logger.Warn("plc device %s, ip: %s, end of connecting.", proc.device.DeviceName, proc.device.DeviceIP) return default: plcConnection, err := NewModbusConnection(proc.device.DeviceIP) if err != nil { logger.Warn("error connecting to PLC: %s, ip: %s", proc.device.DeviceName, proc.device.DeviceIP) // 上报设备离线 msg.SendDeviceLiveData(&msg.PLCResponse{ DeviceID: proc.device.DeviceID, DeviceName: proc.device.DeviceName, DeviceIP: proc.device.DeviceIP, Online: false, }) time.Sleep(30 * time.Second) } else { // 连接成功后, 开始采集数据, 会判断连接是否有效, 断开后会采集任务会退出, 继续重新尝试连接设备 proc.plcConn = &plcConnection runCollectionTask(ctx, proc) } } } } func runCollectionTask(ctx context.Context, proc *collectorProc) { dev := proc.device for { select { case <-ctx.Done(): logger.Warn("plc device %s, ip: %s, end of collection.", dev.DeviceName, dev.DeviceIP) (*proc.plcConn).Close() return default: plcResponse := plc4xRequest(proc) msg.SendDeviceLiveData(plcResponse) // 无法连接了, 退出采集, 重新连接 if !plcResponse.Online { return } // 间隔时间 time.Sleep(time.Duration(dev.Frequency) * time.Second) } } } func plc4xRequest(proc *collectorProc) *msg.PLCResponse { conn := *proc.plcConn dev := proc.device plcResponse := msg.PLCResponse{ DeviceID: dev.DeviceID, DeviceName: dev.DeviceName, DeviceIP: dev.DeviceIP, Online: true, } if !conn.IsConnected() { logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP) plcResponse.Online = false return &plcResponse } // 根据设置的地址查询数据,上报 for _, addr := range dev.Details { result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length) readRequest := msg.PLCData{ StartAddress: addr.StartAddress, Length: addr.Length, Type: addr.Type, FieldName: addr.FieldName, RawData: result, } if err != nil { logger.Warn("plc device Read Holding Register error, %s", err.Error()) readRequest.Message = err.Error() } plcResponse.PLCData = append(plcResponse.PLCData, readRequest) } return &plcResponse }