zhangzengfei
2023-08-04 88da1e13a073e8b5656387a246d827593fbd6163
collector/collector.go
@@ -19,41 +19,66 @@
}
func InitTask() {
   device := msg.PLCDevice{
      Id:       "0",
      Ip:       "192.168.1.188",
      Address:  []int{17021},
      Interval: 1,
   //device := msg.PLCDevice{
   //   DeviceID:   "0",
   //   DeviceName: "test",
   //   DeviceIP:   "192.168.1.188",
   //   Brand:      "sim",
   //   Method:     "modbus",
   //   PortName:   "",
   //   Frequency:  1,
   //   Details: []*msg.PLCAddress{&msg.PLCAddress{
   //      StartAddress: 17021,
   //      Length:       1,
   //      Type:         "int",
   //      FieldName:    "count",
   //   }},
   //}
   devices, err := getDeviceList()
   if err != nil {
      return
   }
   for idx, dev := range devices {
      // 判断设备状态, 如果配置未开启采集数据或其他方式, 现在假设字段status为0时代表采集
      if dev.Status != 0 {
         continue
      }
      addTask(&devices[idx])
   }
}
func addTask(device *msg.PLCDevice) {
   ctx, cancel := context.WithCancel(context.Background())
   proc := collectorProc{
      device: &device,
      device: device,
      cancel: cancel,
   }
   mapTask.Store(device.Id, &proc)
   mapTask.Store(device.DeviceID, &proc)
   go connectingDevice(ctx, &device)
   go connectingDevice(ctx, device)
}
func connectingDevice(ctx context.Context, dev *msg.PLCDevice) {
   plcResponse := msg.PLCResponse{
      Id:     dev.Id,
      Name:   dev.Name,
      Ip:     dev.Ip,
      Online: false,
      DeviceID:   dev.DeviceID,
      DeviceName: dev.DeviceName,
      DeviceIP:   dev.DeviceIP,
      Online:     false,
   }
   for {
      select {
      case <-ctx.Done():
         logger.Warn("plc device %s, ip: %s, end of connecting.", dev.Name, dev.Ip)
         logger.Warn("plc device %s, ip: %s, end of connecting.", dev.DeviceName, dev.DeviceIP)
         return
      default:
         plcConnection, err := NewModbusConnection(dev.Ip)
         plcConnection, err := NewModbusConnection(dev.DeviceIP)
         if err != nil {
            logger.Warn("error connecting to PLC: %s, ip: %s", dev.Name, dev.Ip)
            logger.Warn("error connecting to PLC: %s, ip: %s", dev.DeviceName, dev.DeviceIP)
            plcResponse.Online = false
            msg.SendDeviceLiveData(&plcResponse)
            time.Sleep(30 * time.Second)
@@ -68,35 +93,39 @@
func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) {
   // 创建modbusTCP连接, 循环查询数据并上报
   plcResponse := msg.PLCResponse{
      Id:     dev.Id,
      Name:   dev.Name,
      Ip:     dev.Ip,
      Online: true,
      Data:   nil,
      DeviceID:   dev.DeviceID,
      DeviceName: dev.DeviceName,
      DeviceIP:   dev.DeviceIP,
      Online:     true,
   }
   for {
      select {
      case <-ctx.Done():
         logger.Warn("plc device %s, ip: %s, end of collection.", dev.Name, dev.Ip)
         logger.Warn("plc device %s, ip: %s, end of collection.", dev.DeviceName, dev.DeviceIP)
         conn.Close()
         return
      default:
         if !conn.IsConnected() {
            logger.Warn("plc device %s, ip: %s, disconnected.", dev.Name, dev.Ip)
            logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP)
            return
         }
         // 根据设置的地址查询数据,上报
         plcResponse.Data = make(map[int][]byte, 0)
         plcResponse.Message = ""
         for _, addr := range dev.Address {
            result, err := ReadHoldingRegister(conn, addr)
         for _, addr := range dev.Details {
            result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length)
            if err != nil {
               logger.Warn("plc device Read Holding Register error, %s", err.Error())
               plcResponse.Message = err.Error()
            } else {
               plcResponse.Data[addr] = result
               plcResponse.PLCData = append(plcResponse.PLCData, msg.PLCData{
                  StartAddress: addr.StartAddress,
                  Length:       addr.Length,
                  Type:         addr.Type,
                  FieldName:    addr.FieldName,
                  Data:         result,
               })
            }
         }
@@ -106,7 +135,7 @@
         }
         // 间隔时间
         time.Sleep(time.Duration(dev.Interval) * time.Second)
         time.Sleep(time.Duration(dev.Frequency) * time.Second)
      }
   }
}