zhangzengfei
2023-08-01 e8c97695dd6930465e66b8fac819301f03624512
collector/collector.go
@@ -7,6 +7,8 @@
   "plc-recorder/logger"
   "plc-recorder/msg"
   plc4go "github.com/apache/plc4x/plc4go/pkg/api"
)
var mapTask sync.Map
@@ -19,7 +21,7 @@
func InitTask() {
   device := msg.PLCDevice{
      Id:       "0",
      Ip:       "192.168.20.188",
      Ip:       "192.168.1.188",
      Address:  []int{17021},
      Interval: 1,
   }
@@ -32,32 +34,73 @@
   mapTask.Store(device.Id, &proc)
   go runCollectionTask(ctx, &device)
   go connectingDevice(ctx, &device)
}
func runCollectionTask(ctx context.Context, device *msg.PLCDevice) {
   // 创建modbusTCP连接, 循环查询数据并上报
func connectingDevice(ctx context.Context, dev *msg.PLCDevice) {
   plcResponse := msg.PLCResponse{
      Id:     dev.Id,
      Name:   dev.Name,
      Ip:     dev.Ip,
      Online: false,
   }
   for {
      err, plcConnection := NewModbusConnection(device.Ip)
      if err != nil {
         logger.Warn("Error connecting to PLC: %s, ip:", device.Name, device.Ip)
         time.Sleep(30 * time.Second)
         continue
      }
      for {
         select {
         case <-ctx.Done():
            logger.Warn("Plc device %s, ip: %s, end of collection.", device.Name, device.Ip)
            plcConnection.Close()
            return
         default:
            // 根据设置的地址查询数据,上报
            // 暂停
            time.Sleep(time.Duration(device.Interval) * time.Second)
      select {
      case <-ctx.Done():
         logger.Warn("plc device %s, ip: %s, end of connecting.", dev.Name, dev.Ip)
         return
      default:
         plcConnection, err := NewModbusConnection(dev.Ip)
         if err != nil {
            logger.Warn("error connecting to PLC: %s, ip: %s", dev.Name, dev.Ip)
            plcResponse.Online = false
            msg.SendDeviceLiveData(&plcResponse)
            time.Sleep(30 * time.Second)
         } else {
            // 连接成功后, 开始采集数据, 会判断连接是否有效, 断开后会采集任务会退出, 继续重新尝试连接设备
            runCollectionTask(ctx, dev, plcConnection)
         }
      }
   }
}
func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) {
   // 创建modbusTCP连接, 循环查询数据并上报
   plcResponse := msg.PLCResponse{
      Id:     dev.Id,
      Name:   dev.Name,
      Ip:     dev.Ip,
      Online: true,
      Data:   nil,
   }
   for {
      select {
      case <-ctx.Done():
         logger.Warn("plc device %s, ip: %s, end of collection.", dev.Name, dev.Ip)
         conn.Close()
         return
      default:
         if !conn.IsConnected() {
            logger.Warn("plc device %s, ip: %s, disconnected.", dev.Name, dev.Ip)
            return
         }
         // 根据设置的地址查询数据,上报
         plcResponse.Data = make(map[int][]byte, 0)
         for _, addr := range dev.Address {
            result, err := ReadHoldingRegister(conn, addr)
            if err != nil {
               logger.Warn("plc device Read Holding Register error, %s", err.Error())
            } else {
               plcResponse.Data[addr] = result
            }
         }
         msg.SendDeviceLiveData(&plcResponse)
         // 间隔时间
         time.Sleep(time.Duration(dev.Interval) * time.Second)
      }
   }
}