package collector import ( "context" "sync" "time" "plc-recorder/logger" "plc-recorder/msg" plc4go "github.com/apache/plc4x/plc4go/pkg/api" ) var tasksStore sync.Map type collectorProc struct { device *msg.PLCDevice cancel context.CancelFunc } func InitTask() { logger.Debug("init task") devices, err := getDeviceList() if err != nil { return } for idx, dev := range devices { // 判断设备状态, 如果配置未开启采集数据或其他方式, 现在假设字段status为0时代表采集 if dev.Status != 0 { continue } logger.Debug("add collector task,device %s", dev.DeviceName) addTask(&devices[idx]) } } func stopTask(device *msg.PLCDevice) { if task, ok := tasksStore.Load(device.DeviceID); ok { // 存在的任务, 先停止掉, 然后重新开启一个 task.(collectorProc).cancel() tasksStore.Delete(device.DeviceID) } } func addTask(device *msg.PLCDevice) { ctx, cancel := context.WithCancel(context.Background()) proc := collectorProc{ device: device, cancel: cancel, } tasksStore.Store(device.DeviceID, &proc) go connectingDevice(ctx, device) } func connectingDevice(ctx context.Context, dev *msg.PLCDevice) { plcResponse := msg.PLCResponse{ DeviceID: dev.DeviceID, DeviceName: dev.DeviceName, DeviceIP: dev.DeviceIP, Online: false, } for { select { case <-ctx.Done(): logger.Warn("plc device %s, ip: %s, end of connecting.", dev.DeviceName, dev.DeviceIP) return default: plcConnection, err := NewModbusConnection(dev.DeviceIP) if err != nil { logger.Warn("error connecting to PLC: %s, ip: %s", dev.DeviceName, dev.DeviceIP) plcResponse.Online = false msg.SendDeviceLiveData(&plcResponse) time.Sleep(30 * time.Second) } else { // 连接成功后, 开始采集数据, 会判断连接是否有效, 断开后会采集任务会退出, 继续重新尝试连接设备 runCollectionTask(ctx, dev, plcConnection) } } } } func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) { // 创建modbusTCP连接, 循环查询数据并上报 plcResponse := msg.PLCResponse{ DeviceID: dev.DeviceID, DeviceName: dev.DeviceName, DeviceIP: dev.DeviceIP, Online: true, } for { select { case <-ctx.Done(): logger.Warn("plc device %s, ip: %s, end of collection.", dev.DeviceName, dev.DeviceIP) conn.Close() return default: if !conn.IsConnected() { logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP) return } // 根据设置的地址查询数据,上报 plcResponse.Message = "" for _, addr := range dev.Details { result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length) if err != nil { logger.Warn("plc device Read Holding Register error, %s", err.Error()) plcResponse.Message = err.Error() } else { plcResponse.PLCData = append(plcResponse.PLCData, msg.PLCData{ StartAddress: addr.StartAddress, Length: addr.Length, Type: addr.Type, FieldName: addr.FieldName, RawData: result, }) } } msg.SendDeviceLiveData(&plcResponse) if plcResponse.Message != "" { return } // 间隔时间 time.Sleep(time.Duration(dev.Frequency) * time.Second) } } }