zhangzengfei
2023-08-11 a335f66c4c520728be640ca4e7029ce6f45b8f3d
collector/collector.go
@@ -7,9 +7,11 @@
   "plc-recorder/logger"
   "plc-recorder/msg"
   plc4go "github.com/apache/plc4x/plc4go/pkg/api"
)
var mapTask sync.Map
var tasksStore sync.Map
type collectorProc struct {
   device *msg.PLCDevice
@@ -17,47 +19,117 @@
}
func InitTask() {
   device := msg.PLCDevice{
      Id:       "0",
      Ip:       "192.168.20.188",
      Address:  []int{17021},
      Interval: 1,
   logger.Debug("init task")
   devices, err := getDeviceList()
   if err != nil {
      return
   }
   ctx, cancel := context.WithCancel(context.Background())
   proc := collectorProc{
      device: &device,
      cancel: cancel,
   }
   mapTask.Store(device.Id, &proc)
   go runCollectionTask(ctx, &device)
}
func runCollectionTask(ctx context.Context, device *msg.PLCDevice) {
   // 创建modbusTCP连接, 循环查询数据并上报
   for {
      err, plcConnection := NewModbusConnection(device.Ip)
      if err != nil {
         logger.Warn("Error connecting to PLC: %s, ip:", device.Name, device.Ip)
         time.Sleep(30 * time.Second)
   for idx, dev := range devices {
      // 判断设备状态, 如果配置未开启采集数据或其他方式, 现在假设字段status为0时代表采集
      if dev.Status != 0 {
         continue
      }
      for {
         select {
         case <-ctx.Done():
            logger.Warn("Plc device %s, ip: %s, end of collection.", device.Name, device.Ip)
            plcConnection.Close()
            return
         default:
      logger.Debug("add collector task,device %s", dev.DeviceName)
      addTask(&devices[idx])
   }
}
            // 根据设置的地址查询数据,上报
            // 暂停
            time.Sleep(time.Duration(device.Interval) * time.Second)
func stopTask(device *msg.PLCDevice) {
   if task, ok := tasksStore.Load(device.DeviceID); ok {
      // 存在的任务, 先停止掉, 然后重新开启一个
      task.(collectorProc).cancel()
      tasksStore.Delete(device.DeviceID)
   }
}
func addTask(device *msg.PLCDevice) {
   ctx, cancel := context.WithCancel(context.Background())
   proc := collectorProc{
      device: device,
      cancel: cancel,
   }
   tasksStore.Store(device.DeviceID, &proc)
   go connectingDevice(ctx, device)
}
func connectingDevice(ctx context.Context, dev *msg.PLCDevice) {
   plcResponse := msg.PLCResponse{
      DeviceID:   dev.DeviceID,
      DeviceName: dev.DeviceName,
      DeviceIP:   dev.DeviceIP,
      Online:     false,
   }
   for {
      select {
      case <-ctx.Done():
         logger.Warn("plc device %s, ip: %s, end of connecting.", dev.DeviceName, dev.DeviceIP)
         return
      default:
         plcConnection, err := NewModbusConnection(dev.DeviceIP)
         if err != nil {
            logger.Warn("error connecting to PLC: %s, ip: %s", dev.DeviceName, dev.DeviceIP)
            plcResponse.Online = false
            msg.SendDeviceLiveData(&plcResponse)
            time.Sleep(30 * time.Second)
         } else {
            // 连接成功后, 开始采集数据, 会判断连接是否有效, 断开后会采集任务会退出, 继续重新尝试连接设备
            runCollectionTask(ctx, dev, plcConnection)
         }
      }
   }
}
func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) {
   // 创建modbusTCP连接, 循环查询数据并上报
   plcResponse := msg.PLCResponse{
      DeviceID:   dev.DeviceID,
      DeviceName: dev.DeviceName,
      DeviceIP:   dev.DeviceIP,
      Online:     true,
   }
   for {
      select {
      case <-ctx.Done():
         logger.Warn("plc device %s, ip: %s, end of collection.", dev.DeviceName, dev.DeviceIP)
         conn.Close()
         return
      default:
         if !conn.IsConnected() {
            logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP)
            return
         }
         // 根据设置的地址查询数据,上报
         plcResponse.Message = ""
         for _, addr := range dev.Details {
            result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length)
            if err != nil {
               logger.Warn("plc device Read Holding Register error, %s", err.Error())
               plcResponse.Message = err.Error()
            } else {
               plcResponse.PLCData = append(plcResponse.PLCData, msg.PLCData{
                  StartAddress: addr.StartAddress,
                  Length:       addr.Length,
                  Type:         addr.Type,
                  FieldName:    addr.FieldName,
                  RawData:      result,
               })
            }
         }
         msg.SendDeviceLiveData(&plcResponse)
         if plcResponse.Message != "" {
            return
         }
         // 间隔时间
         time.Sleep(time.Duration(dev.Frequency) * time.Second)
      }
   }
}