zhangqian
2024-04-25 2d6875c93b25d0b7336c7fa11e066d213259fe2e
crontask/cron_task.go
@@ -3,11 +3,14 @@
import (
   "apsClient/conf"
   "apsClient/constvar"
   "apsClient/model"
   "apsClient/model/common"
   "apsClient/nsq"
   "apsClient/pkg/ecode"
   "apsClient/pkg/logx"
   "apsClient/serf"
   "apsClient/service"
   "apsClient/service/problem"
   "fmt"
   "github.com/go-co-op/gocron"
   "github.com/spf13/cast"
@@ -16,7 +19,10 @@
var s *gocron.Scheduler
func StartTask() error {
func init() {
   s = gocron.NewScheduler(time.Local)
}
func StartTask(isMaster bool) error {
   finishNumberTimeInterval := conf.Conf.PLC.FinishNumberTimeInterval
   totalNumberTimeInterval := conf.Conf.PLC.TotalNumberTimeInterval
   if finishNumberTimeInterval == 0 {
@@ -25,7 +31,6 @@
   if totalNumberTimeInterval == 0 {
      totalNumberTimeInterval = 60
   }
   s = gocron.NewScheduler(time.UTC)
   _, err := s.Every(finishNumberTimeInterval).Seconds().Do(func() {
      plcConfig, code := service.NewDevicePlcService().GetDevicePlc()
      if code != ecode.OK {
@@ -33,74 +38,62 @@
      }
      for _, addressItem := range plcConfig.Details {
         if addressItem.FieldName == constvar.PlcStartAddressTypeFinishNumber {
            value, err := service.PlcReadDirect(plcConfig, addressItem.StartAddress, addressItem.Length, addressItem.Type)
            value, err := service.PlcReadDirect(plcConfig, addressItem)
            if err != nil {
               logx.Infof("plc read finish number err: %v", err)
               continue
            }
            finishNumber := cast.ToInt64(value)
            finishNumber := cast.ToInt(value)
            if finishNumber != 0 {
               service.PlcCacheSet(addressItem.Channel, constvar.PlcCacheKeyFinishNumber, finishNumber)
               _ = service.NewProgressService().UpdateProgress(addressItem.Channel, cast.ToInt64(finishNumber))
               service.PlcCacheSet(conf.Conf.CurrentDeviceID, addressItem.Channel, constvar.PlcCacheKeyFinishNumber, finishNumber)
               _ = service.NewProgressService().UpdateProgress(conf.Conf.CurrentDeviceID, addressItem.Channel, finishNumber)
            }
            logx.Infof("plc read finish number: %v", finishNumber)
         }
      }
   })
   if err != nil {
      return err
   }
   s.Every(totalNumberTimeInterval).Seconds().Do(func() {
      plcConfig, code := service.NewDevicePlcService().GetDevicePlc()
      if code != ecode.OK {
         return
      }
      for _, addressItem := range plcConfig.Details {
         if addressItem.FieldName == constvar.PlcStartAddressTypeTotalNumber {
            value, err := service.PlcReadDirect(plcConfig, addressItem.StartAddress, addressItem.Length, addressItem.Type)
            time.Sleep(time.Second * 1)
         } else if addressItem.FieldName == constvar.PlcStartAddressTypeTotalNumber {
            value, err := service.PlcReadDirect(plcConfig, addressItem)
            if err != nil {
               logx.Infof("plc read total number err: %v", err)
               continue
            }
            totalNumber := cast.ToInt64(value)
            if totalNumber != 0 {
               service.PlcCacheSet(addressItem.Channel, constvar.PlcCacheKeyTotalNumber, totalNumber)
               _ = service.NewProgressService().UpdateProgress(addressItem.Channel, cast.ToInt64(totalNumber))
               service.PlcCacheSet(conf.Conf.CurrentDeviceID, addressItem.Channel, constvar.PlcCacheKeyTotalNumber, totalNumber)
            }
            logx.Infof("plc read total number: %v", totalNumber)
            time.Sleep(time.Second * 1)
         }
      }
   })
   if err != nil {
      return err
   }
   s.Every(60).Seconds().Do(SyncProductionProgress) //同步生产数据
   s.Every(60).Seconds().Do(SyncTaskStatus)         //同步任务状态
   if isMaster {
      s.Every(20).Seconds().Do(SyncProductionProgress) //同步生产数据
      s.Every(30).Seconds().Do(SyncTaskStatus)         //同步任务状态
      s.Every(10).Seconds().Do(CheckNsqConn)           //查询nsq连接
      s.Every(30).Seconds().Do(ReportData)             //上报数据
   }
   s.Every(20).Seconds().Do(QueryClusterStatus) //查询集群节点数量
   s.Every(30).Seconds().Do(ProblemCheck)       //问题诊断
   s.StartAsync()
   return nil
}
func SyncProductionProgress() {
   plcConfig, code := service.NewDevicePlcService().GetDevicePlc()
   if code != ecode.OK {
   progressList, err := service.NewProgressService().GetProgressList()
   if err != nil {
      logx.Errorf("SyncProductionProgress get records err:%v", err)
      return
   }
   var channels []int32
   for _, item := range plcConfig.Details {
      if item.FieldName == constvar.PlcStartAddressTypeFinishNumber {
         channels = append(channels, item.Channel)
   for _, progress := range progressList {
      if progress.FinishedQuantity == 0 {
         continue
      }
   }
   for _, channel := range channels {
      progress, err := service.NewProgressService().GetCurrentProgress(channel)
      if err != nil {
         return
      }
      if progress == nil {
         return
      }
      caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicSyncTaskProgress, conf.Conf.NsqConf.NodeId), "")
      err = caller.Send(progress)
      if err != nil {
@@ -110,27 +103,133 @@
}
func SyncTaskStatus() {
   //todo
}
func StopTask() {
   if s != nil {
      s.Stop()
   records, err := service.NewTaskService().GetTaskStatusSync(100)
   if err != nil {
      logx.Errorf("SyncTaskStatus get records err:%v", err)
   }
   syncOkIds := make([]uint, 0, len(records))
   for _, record := range records {
      msg := &common.MsgTaskStatusUpdate{
         WorkOrderId:        record.WorkOrderId,
         ProcedureID:        record.ProcedureID,
         DeviceId:           record.DeviceId,
         IsProcessing:       record.IsProcessing,
         IsFinish:           record.IsFinish,
         FinishAmount:       record.FinishedQuantity,
         ProductProcedureID: record.ProductProcedureID,
         StartTs:            record.StartTs,
         FinishTs:           record.FinishTs,
      }
      caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicTaskProcedureStatusUpdate, conf.Conf.NsqConf.NodeId), "")
      err = caller.Send(msg)
      if err != nil {
         logx.Errorf("sync task status send msg error:%v", err.Error())
         continue
      }
      syncOkIds = append(syncOkIds, record.ID)
   }
   if len(syncOkIds) > 0 {
      err = service.NewTaskService().FinishTaskStatusSync(syncOkIds)
      if err != nil {
         logx.Errorf("sync task status delete sync ok records error:%v", err)
      }
   }
}
func ReportData() {
   records, err := model.NewReportsToCloudSearch(nil).SetOrder("id desc").SetPage(1, 100).FindNotTotal()
   if err != nil {
      logx.Errorf("ReportData get records err:%v", err)
   }
   okIds := make([]uint, 0, len(records))
   for _, record := range records {
      msg := &common.MsgReportData{
         ReportType: record.ReportType,
         Content:    record.Content,
      }
      caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicApsClientReportData, conf.Conf.NsqConf.NodeId), "")
      err = caller.Send(msg)
      if err != nil {
         logx.Errorf("sync task status send msg error:%v", err.Error())
         continue
      }
      okIds = append(okIds, record.ID)
   }
   if len(okIds) > 0 {
      err = model.NewReportsToCloudSearch(nil).SetIDs(okIds).Delete()
      if err != nil {
         logx.Errorf("ReportData delete report ok records error:%v", err)
      }
   }
}
func RestartTask(isMaster bool) error {
   if s != nil {
      s.Stop()
      s.Clear()
   }
   err := StartTask(isMaster)
   if err != nil {
      logx.Errorf("restart task failed:%v", err)
      return err
   }
   logx.Infof("restart task ok, isMaster:", isMaster)
   return nil
}
// Once 一次性任务
func Once() {
func Once(isMaster bool) {
   if !isMaster {
      return
   }
   msg := &common.MsgPullDataRequest{DataType: common.PullDataTypeProcessModel}
   caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse)
   err := caller.Send(msg)
   if err != nil {
      logx.Errorf("send pull data msg error:%v", err.Error())
      logx.Errorf("send pull data msg error:%v, msg:%+v", err.Error(), msg)
   }
   caller = nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), "")
   err = caller.Send(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId})
   msg = &common.MsgPullDataRequest{DataType: common.PullDataTypeDevice}
   caller = nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse)
   err = caller.Send(msg)
   if err != nil {
      logx.Infof("get plc address err: %v", err.Error())
      logx.Errorf("send pull data msg error:%v, msg:%+v", err.Error(), msg)
   }
   msg = &common.MsgPullDataRequest{DataType: common.PullDataTypeProcessModelPlcAddress}
   caller = nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse)
   err = caller.Send(msg)
   if err != nil {
      logx.Errorf("send pull data msg error:%v, msg:%+v", err.Error(), msg)
   }
}
func QueryClusterStatus() {
   clusterStatus, nodeQuantity := serf.QueryClusterStatusAndNodeQuantity()
   conf.Conf.SerfClusterStatus = clusterStatus
   conf.Conf.ClusterNodeQuantity = nodeQuantity
}
func ProblemCheck() {
   problem.Check()
}
func CheckNsqConn() {
   var err error
   var status constvar.SystemStatusValue
   if nsq.Ping() {
      status = constvar.SystemStatusValueNormal
   } else {
      status = constvar.SystemStatusValueUnNormal
   }
   old, err := model.NewSystemStatusSearch().SetKey(constvar.SystemStatusKeyNsq).First()
   if err != nil {
      logx.Errorf("get nsq status err:%v", err)
      return
   }
   if old.Value == status {
      return
   }
   err = model.NewSystemStatusSearch().SetKey(constvar.SystemStatusKeyNsq).Updates(map[string]interface{}{"value": status})
   if err != nil {
      logx.Errorf("update nsq status err:%v", err)
   }
}