zhangzengfei
2023-08-11 bc0b7e914a378b2c40f9d2ec2470b61a19c18288
collector/collector.go
@@ -14,10 +14,12 @@
var tasksStore sync.Map
type collectorProc struct {
   device *msg.PLCDevice
   cancel context.CancelFunc
   device  *msg.PLCDevice
   cancel  context.CancelFunc
   plcConn *plc4go.PlcConnection
}
// 初始化采集任务, 请求设备列表, 按设备添加采集任务
func InitTask() {
   logger.Debug("init task")
   devices, err := getDeviceList()
@@ -53,78 +55,55 @@
   tasksStore.Store(device.DeviceID, &proc)
   go connectingDevice(ctx, device)
   go connectingDevice(ctx, &proc)
}
func connectingDevice(ctx context.Context, dev *msg.PLCDevice) {
   plcResponse := msg.PLCResponse{
      DeviceID:   dev.DeviceID,
      DeviceName: dev.DeviceName,
      DeviceIP:   dev.DeviceIP,
      Online:     false,
   }
// 进入采集任务, 开始尝试连接设备, 失败后30秒重试, 成功后进入循环采集
func connectingDevice(ctx context.Context, proc *collectorProc) {
   for {
      select {
      case <-ctx.Done():
         logger.Warn("plc device %s, ip: %s, end of connecting.", dev.DeviceName, dev.DeviceIP)
         logger.Warn("plc device %s, ip: %s, end of connecting.", proc.device.DeviceName, proc.device.DeviceIP)
         return
      default:
         plcConnection, err := NewModbusConnection(dev.DeviceIP)
         plcConnection, err := NewModbusConnection(proc.device.DeviceIP)
         if err != nil {
            logger.Warn("error connecting to PLC: %s, ip: %s", dev.DeviceName, dev.DeviceIP)
            plcResponse.Online = false
            msg.SendDeviceLiveData(&plcResponse)
            logger.Warn("error connecting to PLC: %s, ip: %s", proc.device.DeviceName, proc.device.DeviceIP)
            // 上报设备离线
            msg.SendDeviceLiveData(&msg.PLCResponse{
               DeviceID:   proc.device.DeviceID,
               DeviceName: proc.device.DeviceName,
               DeviceIP:   proc.device.DeviceIP,
               Online:     false,
            })
            time.Sleep(30 * time.Second)
         } else {
            // 连接成功后, 开始采集数据, 会判断连接是否有效, 断开后会采集任务会退出, 继续重新尝试连接设备
            runCollectionTask(ctx, dev, plcConnection)
            proc.plcConn = &plcConnection
            runCollectionTask(ctx, proc)
         }
      }
   }
}
func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) {
   // 创建modbusTCP连接, 循环查询数据并上报
   plcResponse := msg.PLCResponse{
      DeviceID:   dev.DeviceID,
      DeviceName: dev.DeviceName,
      DeviceIP:   dev.DeviceIP,
      Online:     true,
   }
func runCollectionTask(ctx context.Context, proc *collectorProc) {
   dev := proc.device
   for {
      select {
      case <-ctx.Done():
         logger.Warn("plc device %s, ip: %s, end of collection.", dev.DeviceName, dev.DeviceIP)
         conn.Close()
         (*proc.plcConn).Close()
         return
      default:
         if !conn.IsConnected() {
            logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP)
            return
         }
         plcResponse := plc4xRequest(proc)
         // 根据设置的地址查询数据,上报
         plcResponse.Message = ""
         for _, addr := range dev.Details {
            result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length)
            if err != nil {
               logger.Warn("plc device Read Holding Register error, %s", err.Error())
               plcResponse.Message = err.Error()
            } else {
               plcResponse.PLCData = append(plcResponse.PLCData, msg.PLCData{
                  StartAddress: addr.StartAddress,
                  Length:       addr.Length,
                  Type:         addr.Type,
                  FieldName:    addr.FieldName,
                  RawData:      result,
               })
            }
         }
         msg.SendDeviceLiveData(plcResponse)
         msg.SendDeviceLiveData(&plcResponse)
         if plcResponse.Message != "" {
         // 无法连接了, 退出采集, 重新连接
         if !plcResponse.Online {
            return
         }
@@ -133,3 +112,43 @@
      }
   }
}
func plc4xRequest(proc *collectorProc) *msg.PLCResponse {
   conn := *proc.plcConn
   dev := proc.device
   plcResponse := msg.PLCResponse{
      DeviceID:   dev.DeviceID,
      DeviceName: dev.DeviceName,
      DeviceIP:   dev.DeviceIP,
      Online:     true,
   }
   if !conn.IsConnected() {
      logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP)
      plcResponse.Online = false
      return &plcResponse
   }
   // 根据设置的地址查询数据,上报
   for _, addr := range dev.Details {
      result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length)
      readRequest := msg.PLCData{
         StartAddress: addr.StartAddress,
         Length:       addr.Length,
         Type:         addr.Type,
         FieldName:    addr.FieldName,
         RawData:      result,
      }
      if err != nil {
         logger.Warn("plc device Read Holding Register error, %s", err.Error())
         readRequest.Message = err.Error()
      }
      plcResponse.PLCData = append(plcResponse.PLCData, readRequest)
   }
   return &plcResponse
}