package crontask
|
|
import (
|
"apsClient/conf"
|
"apsClient/constvar"
|
"apsClient/model"
|
"apsClient/model/common"
|
"apsClient/nsq"
|
"apsClient/pkg/ecode"
|
"apsClient/pkg/logx"
|
"apsClient/serf"
|
"apsClient/service"
|
"apsClient/service/problem"
|
"fmt"
|
"github.com/go-co-op/gocron"
|
"github.com/spf13/cast"
|
"time"
|
)
|
|
var s *gocron.Scheduler
|
|
func init() {
|
s = gocron.NewScheduler(time.Local)
|
}
|
func StartTask(isMaster bool) error {
|
finishNumberTimeInterval := conf.Conf.PLC.FinishNumberTimeInterval
|
totalNumberTimeInterval := conf.Conf.PLC.TotalNumberTimeInterval
|
if finishNumberTimeInterval == 0 {
|
finishNumberTimeInterval = 6
|
}
|
if totalNumberTimeInterval == 0 {
|
totalNumberTimeInterval = 60
|
}
|
_, err := s.Every(finishNumberTimeInterval).Seconds().Do(func() {
|
plcConfig, code := service.NewDevicePlcService().GetDevicePlc()
|
if code != ecode.OK {
|
return
|
}
|
for _, addressItem := range plcConfig.Details {
|
if addressItem.FieldName == constvar.PlcStartAddressTypeFinishNumber {
|
value, err := service.PlcReadDirect(plcConfig, addressItem)
|
if err != nil {
|
logx.Infof("plc read finish number err: %v", err)
|
continue
|
}
|
finishNumber := cast.ToInt(value)
|
if finishNumber != 0 {
|
service.PlcCacheSet(conf.Conf.CurrentDeviceID, addressItem.Channel, constvar.PlcCacheKeyFinishNumber, finishNumber)
|
_ = service.NewProgressService().UpdateProgress(conf.Conf.CurrentDeviceID, addressItem.Channel, finishNumber)
|
}
|
logx.Infof("plc read finish number: %v", finishNumber)
|
time.Sleep(time.Second * 1)
|
} else if addressItem.FieldName == constvar.PlcStartAddressTypeTotalNumber {
|
value, err := service.PlcReadDirect(plcConfig, addressItem)
|
if err != nil {
|
logx.Infof("plc read total number err: %v", err)
|
continue
|
}
|
totalNumber := cast.ToInt64(value)
|
if totalNumber != 0 {
|
service.PlcCacheSet(conf.Conf.CurrentDeviceID, addressItem.Channel, constvar.PlcCacheKeyTotalNumber, totalNumber)
|
}
|
logx.Infof("plc read total number: %v", totalNumber)
|
time.Sleep(time.Second * 1)
|
}
|
|
}
|
})
|
if err != nil {
|
return err
|
}
|
|
if isMaster {
|
s.Every(20).Seconds().Do(SyncProductionProgress) //同步生产数据
|
s.Every(30).Seconds().Do(SyncTaskStatus) //同步任务状态
|
s.Every(10).Seconds().Do(CheckNsqConn) //查询nsq连接
|
s.Every(30).Seconds().Do(ReportData) //上报数据
|
}
|
|
s.Every(20).Seconds().Do(QueryClusterStatus) //查询集群节点数量
|
s.Every(30).Seconds().Do(ProblemCheck) //问题诊断
|
|
s.StartAsync()
|
return nil
|
}
|
|
func SyncProductionProgress() {
|
progressList, err := service.NewProgressService().GetProgressList()
|
if err != nil {
|
logx.Errorf("SyncProductionProgress get records err:%v", err)
|
return
|
}
|
for _, progress := range progressList {
|
if progress.FinishedQuantity == 0 {
|
continue
|
}
|
caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicSyncTaskProgress, conf.Conf.NsqConf.NodeId), "")
|
err = caller.Send(progress)
|
if err != nil {
|
logx.Errorf("SyncProductionProgress error:%v", err.Error())
|
}
|
}
|
}
|
|
func SyncTaskStatus() {
|
records, err := service.NewTaskService().GetTaskStatusSync(100)
|
if err != nil {
|
logx.Errorf("SyncTaskStatus get records err:%v", err)
|
}
|
syncOkIds := make([]uint, 0, len(records))
|
for _, record := range records {
|
msg := &common.MsgTaskStatusUpdate{
|
WorkOrderId: record.WorkOrderId,
|
ProcedureID: record.ProcedureID,
|
DeviceId: record.DeviceId,
|
IsProcessing: record.IsProcessing,
|
IsFinish: record.IsFinish,
|
FinishAmount: record.FinishedQuantity,
|
ProductProcedureID: record.ProductProcedureID,
|
StartTs: record.StartTs,
|
FinishTs: record.FinishTs,
|
}
|
caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicTaskProcedureStatusUpdate, conf.Conf.NsqConf.NodeId), "")
|
err = caller.Send(msg)
|
if err != nil {
|
logx.Errorf("sync task status send msg error:%v", err.Error())
|
continue
|
}
|
syncOkIds = append(syncOkIds, record.ID)
|
}
|
if len(syncOkIds) > 0 {
|
err = service.NewTaskService().FinishTaskStatusSync(syncOkIds)
|
if err != nil {
|
logx.Errorf("sync task status delete sync ok records error:%v", err)
|
}
|
}
|
}
|
|
func ReportData() {
|
records, err := model.NewReportsToCloudSearch(nil).SetOrder("id desc").SetPage(1, 100).FindNotTotal()
|
if err != nil {
|
logx.Errorf("ReportData get records err:%v", err)
|
}
|
okIds := make([]uint, 0, len(records))
|
for _, record := range records {
|
msg := &common.MsgReportData{
|
ReportType: record.ReportType,
|
Content: record.Content,
|
}
|
caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicApsClientReportData, conf.Conf.NsqConf.NodeId), "")
|
err = caller.Send(msg)
|
if err != nil {
|
logx.Errorf("sync task status send msg error:%v", err.Error())
|
continue
|
}
|
okIds = append(okIds, record.ID)
|
}
|
if len(okIds) > 0 {
|
err = model.NewReportsToCloudSearch(nil).SetIDs(okIds).Delete()
|
if err != nil {
|
logx.Errorf("ReportData delete report ok records error:%v", err)
|
}
|
}
|
}
|
|
func RestartTask(isMaster bool) error {
|
if s != nil {
|
s.Stop()
|
s.Clear()
|
}
|
err := StartTask(isMaster)
|
if err != nil {
|
logx.Errorf("restart task failed:%v", err)
|
return err
|
}
|
logx.Infof("restart task ok, isMaster:", isMaster)
|
return nil
|
}
|
|
// Once 一次性任务
|
func Once(isMaster bool) {
|
if !isMaster {
|
return
|
}
|
msg := &common.MsgPullDataRequest{DataType: common.PullDataTypeProcessModel}
|
caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse)
|
err := caller.Send(msg)
|
if err != nil {
|
logx.Errorf("send pull data msg error:%v, msg:%+v", err.Error(), msg)
|
}
|
msg = &common.MsgPullDataRequest{DataType: common.PullDataTypeDevice}
|
caller = nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse)
|
err = caller.Send(msg)
|
if err != nil {
|
logx.Errorf("send pull data msg error:%v, msg:%+v", err.Error(), msg)
|
}
|
msg = &common.MsgPullDataRequest{DataType: common.PullDataTypeProcessModelPlcAddress}
|
caller = nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse)
|
err = caller.Send(msg)
|
if err != nil {
|
logx.Errorf("send pull data msg error:%v, msg:%+v", err.Error(), msg)
|
}
|
}
|
|
func QueryClusterStatus() {
|
clusterStatus, nodeQuantity := serf.QueryClusterStatusAndNodeQuantity()
|
conf.Conf.SerfClusterStatus = clusterStatus
|
conf.Conf.ClusterNodeQuantity = nodeQuantity
|
}
|
|
func ProblemCheck() {
|
problem.Check()
|
}
|
|
func CheckNsqConn() {
|
var err error
|
var status constvar.SystemStatusValue
|
if nsq.Ping() {
|
status = constvar.SystemStatusValueNormal
|
} else {
|
status = constvar.SystemStatusValueUnNormal
|
}
|
old, err := model.NewSystemStatusSearch().SetKey(constvar.SystemStatusKeyNsq).First()
|
if err != nil {
|
logx.Errorf("get nsq status err:%v", err)
|
return
|
}
|
if old.Value == status {
|
return
|
}
|
err = model.NewSystemStatusSearch().SetKey(constvar.SystemStatusKeyNsq).Updates(map[string]interface{}{"value": status})
|
if err != nil {
|
logx.Errorf("update nsq status err:%v", err)
|
}
|
}
|