package collector import ( "context" "sync" "time" "plc-recorder/logger" "plc-recorder/msg" plc4go "github.com/apache/plc4x/plc4go/pkg/api" ) var mapTask sync.Map type collectorProc struct { device *msg.PLCDevice cancel context.CancelFunc } func InitTask() { //device := msg.PLCDevice{ // DeviceID: "0", // DeviceName: "test", // DeviceIP: "192.168.1.188", // Brand: "sim", // Method: "modbus", // PortName: "", // Frequency: 1, // Details: []*msg.PLCAddress{&msg.PLCAddress{ // StartAddress: 17021, // Length: 1, // Type: "int", // FieldName: "count", // }}, //} devices, err := getDeviceList() if err != nil { return } for idx, dev := range devices { // 判断设备状态, 如果配置未开启采集数据或其他方式, 现在假设字段status为0时代表采集 if dev.Status != 0 { continue } addTask(&devices[idx]) } } func addTask(device *msg.PLCDevice) { ctx, cancel := context.WithCancel(context.Background()) proc := collectorProc{ device: device, cancel: cancel, } mapTask.Store(device.DeviceID, &proc) go connectingDevice(ctx, device) } func connectingDevice(ctx context.Context, dev *msg.PLCDevice) { plcResponse := msg.PLCResponse{ DeviceID: dev.DeviceID, DeviceName: dev.DeviceName, DeviceIP: dev.DeviceIP, Online: false, } for { select { case <-ctx.Done(): logger.Warn("plc device %s, ip: %s, end of connecting.", dev.DeviceName, dev.DeviceIP) return default: plcConnection, err := NewModbusConnection(dev.DeviceIP) if err != nil { logger.Warn("error connecting to PLC: %s, ip: %s", dev.DeviceName, dev.DeviceIP) plcResponse.Online = false msg.SendDeviceLiveData(&plcResponse) time.Sleep(30 * time.Second) } else { // 连接成功后, 开始采集数据, 会判断连接是否有效, 断开后会采集任务会退出, 继续重新尝试连接设备 runCollectionTask(ctx, dev, plcConnection) } } } } func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) { // 创建modbusTCP连接, 循环查询数据并上报 plcResponse := msg.PLCResponse{ DeviceID: dev.DeviceID, DeviceName: dev.DeviceName, DeviceIP: dev.DeviceIP, Online: true, } for { select { case <-ctx.Done(): logger.Warn("plc device %s, ip: %s, end of collection.", dev.DeviceName, dev.DeviceIP) conn.Close() return default: if !conn.IsConnected() { logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP) return } // 根据设置的地址查询数据,上报 plcResponse.Message = "" for _, addr := range dev.Details { result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length) if err != nil { logger.Warn("plc device Read Holding Register error, %s", err.Error()) plcResponse.Message = err.Error() } else { plcResponse.PLCData = append(plcResponse.PLCData, msg.PLCData{ StartAddress: addr.StartAddress, Length: addr.Length, Type: addr.Type, FieldName: addr.FieldName, Data: result, }) } } msg.SendDeviceLiveData(&plcResponse) if plcResponse.Message != "" { return } // 间隔时间 time.Sleep(time.Duration(dev.Frequency) * time.Second) } } }