zhangzengfei
2023-08-11 a335f66c4c520728be640ca4e7029ce6f45b8f3d
collector/collector.go
@@ -11,7 +11,7 @@
   plc4go "github.com/apache/plc4x/plc4go/pkg/api"
)
var mapTask sync.Map
var tasksStore sync.Map
type collectorProc struct {
   device *msg.PLCDevice
@@ -19,41 +19,60 @@
}
func InitTask() {
   device := msg.PLCDevice{
      Id:       "0",
      Ip:       "192.168.1.188",
      Address:  []int{17021},
      Interval: 1,
   logger.Debug("init task")
   devices, err := getDeviceList()
   if err != nil {
      return
   }
   for idx, dev := range devices {
      // 判断设备状态, 如果配置未开启采集数据或其他方式, 现在假设字段status为0时代表采集
      if dev.Status != 0 {
         continue
      }
      logger.Debug("add collector task,device %s", dev.DeviceName)
      addTask(&devices[idx])
   }
}
func stopTask(device *msg.PLCDevice) {
   if task, ok := tasksStore.Load(device.DeviceID); ok {
      // 存在的任务, 先停止掉, 然后重新开启一个
      task.(collectorProc).cancel()
      tasksStore.Delete(device.DeviceID)
   }
}
func addTask(device *msg.PLCDevice) {
   ctx, cancel := context.WithCancel(context.Background())
   proc := collectorProc{
      device: &device,
      device: device,
      cancel: cancel,
   }
   mapTask.Store(device.Id, &proc)
   tasksStore.Store(device.DeviceID, &proc)
   go connectingDevice(ctx, &device)
   go connectingDevice(ctx, device)
}
func connectingDevice(ctx context.Context, dev *msg.PLCDevice) {
   plcResponse := msg.PLCResponse{
      Id:     dev.Id,
      Name:   dev.Name,
      Ip:     dev.Ip,
      Online: false,
      DeviceID:   dev.DeviceID,
      DeviceName: dev.DeviceName,
      DeviceIP:   dev.DeviceIP,
      Online:     false,
   }
   for {
      select {
      case <-ctx.Done():
         logger.Warn("plc device %s, ip: %s, end of connecting.", dev.Name, dev.Ip)
         logger.Warn("plc device %s, ip: %s, end of connecting.", dev.DeviceName, dev.DeviceIP)
         return
      default:
         plcConnection, err := NewModbusConnection(dev.Ip)
         plcConnection, err := NewModbusConnection(dev.DeviceIP)
         if err != nil {
            logger.Warn("error connecting to PLC: %s, ip: %s", dev.Name, dev.Ip)
            logger.Warn("error connecting to PLC: %s, ip: %s", dev.DeviceName, dev.DeviceIP)
            plcResponse.Online = false
            msg.SendDeviceLiveData(&plcResponse)
            time.Sleep(30 * time.Second)
@@ -68,39 +87,49 @@
func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) {
   // 创建modbusTCP连接, 循环查询数据并上报
   plcResponse := msg.PLCResponse{
      Id:     dev.Id,
      Name:   dev.Name,
      Ip:     dev.Ip,
      Online: true,
      Data:   nil,
      DeviceID:   dev.DeviceID,
      DeviceName: dev.DeviceName,
      DeviceIP:   dev.DeviceIP,
      Online:     true,
   }
   for {
      select {
      case <-ctx.Done():
         logger.Warn("plc device %s, ip: %s, end of collection.", dev.Name, dev.Ip)
         logger.Warn("plc device %s, ip: %s, end of collection.", dev.DeviceName, dev.DeviceIP)
         conn.Close()
         return
      default:
         if !conn.IsConnected() {
            logger.Warn("plc device %s, ip: %s, disconnected.", dev.Name, dev.Ip)
            logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP)
            return
         }
         // 根据设置的地址查询数据,上报
         plcResponse.Data = make(map[int][]byte, 0)
         for _, addr := range dev.Address {
            result, err := ReadHoldingRegister(conn, addr)
         plcResponse.Message = ""
         for _, addr := range dev.Details {
            result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length)
            if err != nil {
               logger.Warn("plc device Read Holding Register error, %s", err.Error())
               plcResponse.Message = err.Error()
            } else {
               plcResponse.Data[addr] = result
               plcResponse.PLCData = append(plcResponse.PLCData, msg.PLCData{
                  StartAddress: addr.StartAddress,
                  Length:       addr.Length,
                  Type:         addr.Type,
                  FieldName:    addr.FieldName,
                  RawData:      result,
               })
            }
         }
         msg.SendDeviceLiveData(&plcResponse)
         if plcResponse.Message != "" {
            return
         }
         // 间隔时间
         time.Sleep(time.Duration(dev.Interval) * time.Second)
         time.Sleep(time.Duration(dev.Frequency) * time.Second)
      }
   }
}