| | |
| | | var tasksStore sync.Map |
| | | |
| | | type collectorProc struct { |
| | | device *msg.PLCDevice |
| | | cancel context.CancelFunc |
| | | device *msg.PLCDevice |
| | | cancel context.CancelFunc |
| | | plcConn *plc4go.PlcConnection |
| | | } |
| | | |
| | | // 初始化采集任务, 请求设备列表, 按设备添加采集任务 |
| | | func InitTask() { |
| | | logger.Debug("init task") |
| | | devices, err := getDeviceList() |
| | |
| | | |
| | | tasksStore.Store(device.DeviceID, &proc) |
| | | |
| | | go connectingDevice(ctx, device) |
| | | go connectingDevice(ctx, &proc) |
| | | } |
| | | |
| | | func connectingDevice(ctx context.Context, dev *msg.PLCDevice) { |
| | | plcResponse := msg.PLCResponse{ |
| | | DeviceID: dev.DeviceID, |
| | | DeviceName: dev.DeviceName, |
| | | DeviceIP: dev.DeviceIP, |
| | | Online: false, |
| | | } |
| | | |
| | | // 进入采集任务, 开始尝试连接设备, 失败后30秒重试, 成功后进入循环采集 |
| | | func connectingDevice(ctx context.Context, proc *collectorProc) { |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | logger.Warn("plc device %s, ip: %s, end of connecting.", dev.DeviceName, dev.DeviceIP) |
| | | logger.Warn("plc device %s, ip: %s, end of connecting.", proc.device.DeviceName, proc.device.DeviceIP) |
| | | return |
| | | default: |
| | | plcConnection, err := NewModbusConnection(dev.DeviceIP) |
| | | plcConnection, err := NewModbusConnection(proc.device.DeviceIP) |
| | | if err != nil { |
| | | logger.Warn("error connecting to PLC: %s, ip: %s", dev.DeviceName, dev.DeviceIP) |
| | | plcResponse.Online = false |
| | | msg.SendDeviceLiveData(&plcResponse) |
| | | logger.Warn("error connecting to PLC: %s, ip: %s", proc.device.DeviceName, proc.device.DeviceIP) |
| | | |
| | | // 上报设备离线 |
| | | msg.SendDeviceLiveData(&msg.PLCResponse{ |
| | | DeviceID: proc.device.DeviceID, |
| | | DeviceName: proc.device.DeviceName, |
| | | DeviceIP: proc.device.DeviceIP, |
| | | Online: false, |
| | | }) |
| | | |
| | | time.Sleep(30 * time.Second) |
| | | } else { |
| | | // 连接成功后, 开始采集数据, 会判断连接是否有效, 断开后会采集任务会退出, 继续重新尝试连接设备 |
| | | runCollectionTask(ctx, dev, plcConnection) |
| | | proc.plcConn = &plcConnection |
| | | runCollectionTask(ctx, proc) |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) { |
| | | // 创建modbusTCP连接, 循环查询数据并上报 |
| | | plcResponse := msg.PLCResponse{ |
| | | DeviceID: dev.DeviceID, |
| | | DeviceName: dev.DeviceName, |
| | | DeviceIP: dev.DeviceIP, |
| | | Online: true, |
| | | } |
| | | func runCollectionTask(ctx context.Context, proc *collectorProc) { |
| | | dev := proc.device |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | logger.Warn("plc device %s, ip: %s, end of collection.", dev.DeviceName, dev.DeviceIP) |
| | | conn.Close() |
| | | (*proc.plcConn).Close() |
| | | return |
| | | default: |
| | | if !conn.IsConnected() { |
| | | logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP) |
| | | return |
| | | } |
| | | plcResponse := plc4xRequest(proc) |
| | | |
| | | // 根据设置的地址查询数据,上报 |
| | | plcResponse.Message = "" |
| | | for _, addr := range dev.Details { |
| | | result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length) |
| | | if err != nil { |
| | | logger.Warn("plc device Read Holding Register error, %s", err.Error()) |
| | | plcResponse.Message = err.Error() |
| | | } else { |
| | | plcResponse.PLCData = append(plcResponse.PLCData, msg.PLCData{ |
| | | StartAddress: addr.StartAddress, |
| | | Length: addr.Length, |
| | | Type: addr.Type, |
| | | FieldName: addr.FieldName, |
| | | RawData: result, |
| | | }) |
| | | } |
| | | } |
| | | msg.SendDeviceLiveData(plcResponse) |
| | | |
| | | msg.SendDeviceLiveData(&plcResponse) |
| | | if plcResponse.Message != "" { |
| | | // 无法连接了, 退出采集, 重新连接 |
| | | if !plcResponse.Online { |
| | | return |
| | | } |
| | | |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | func plc4xRequest(proc *collectorProc) *msg.PLCResponse { |
| | | conn := *proc.plcConn |
| | | dev := proc.device |
| | | |
| | | plcResponse := msg.PLCResponse{ |
| | | DeviceID: dev.DeviceID, |
| | | DeviceName: dev.DeviceName, |
| | | DeviceIP: dev.DeviceIP, |
| | | Online: true, |
| | | } |
| | | |
| | | if !conn.IsConnected() { |
| | | logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP) |
| | | plcResponse.Online = false |
| | | |
| | | return &plcResponse |
| | | } |
| | | |
| | | // 根据设置的地址查询数据,上报 |
| | | for _, addr := range dev.Details { |
| | | result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length) |
| | | readRequest := msg.PLCData{ |
| | | StartAddress: addr.StartAddress, |
| | | Length: addr.Length, |
| | | Type: addr.Type, |
| | | FieldName: addr.FieldName, |
| | | RawData: result, |
| | | } |
| | | |
| | | if err != nil { |
| | | logger.Warn("plc device Read Holding Register error, %s", err.Error()) |
| | | readRequest.Message = err.Error() |
| | | } |
| | | |
| | | plcResponse.PLCData = append(plcResponse.PLCData, readRequest) |
| | | } |
| | | |
| | | return &plcResponse |
| | | } |