From f75b6f9ee6b2c542d52b4a11113bd281b421c4c0 Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期二, 17 十月 2023 19:34:30 +0800 Subject: [PATCH] 定时任务分master模式和非master模式 --- service/progress.go | 9 ++++ model/production_progress.go | 18 +++++++-- main.go | 25 +++++++----- crontask/cron_task.go | 37 +++++++----------- 4 files changed, 52 insertions(+), 37 deletions(-) diff --git a/crontask/cron_task.go b/crontask/cron_task.go index 2bdb2c2..3ff0d59 100644 --- a/crontask/cron_task.go +++ b/crontask/cron_task.go @@ -16,7 +16,7 @@ var s *gocron.Scheduler -func StartTask() error { +func StartTask(isMaster bool) error { finishNumberTimeInterval := conf.Conf.PLC.FinishNumberTimeInterval totalNumberTimeInterval := conf.Conf.PLC.TotalNumberTimeInterval if finishNumberTimeInterval == 0 { @@ -75,32 +75,21 @@ }) - s.Every(60).Seconds().Do(SyncProductionProgress) //鍚屾鐢熶骇鏁版嵁 - s.Every(30).Seconds().Do(SyncTaskStatus) //鍚屾浠诲姟鐘舵�� + if isMaster { + s.Every(60).Seconds().Do(SyncProductionProgress) //鍚屾鐢熶骇鏁版嵁 + s.Every(30).Seconds().Do(SyncTaskStatus) //鍚屾浠诲姟鐘舵�� + } s.StartAsync() return nil } func SyncProductionProgress() { - plcConfig, code := service.NewDevicePlcService().GetDevicePlc() - if code != ecode.OK { + progressList, err := service.NewProgressService().GetProgressList() + if err != nil { + logx.Errorf("SyncProductionProgress get records err:%v", err) return } - var channels []int32 - for _, item := range plcConfig.Details { - if item.FieldName == constvar.PlcStartAddressTypeFinishNumber { - channels = append(channels, item.Channel) - } - } - for _, channel := range channels { - progress, err := service.NewProgressService().GetCurrentProgress(channel) - if err != nil { - return - } - if progress == nil { - return - } - + for _, progress := range progressList { caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicSyncTaskProgress, conf.Conf.NsqConf.NodeId), "") err = caller.Send(progress) if err != nil { @@ -130,14 +119,18 @@ } } -func StopTask() { +func RestartTask(isMaster bool) error { if s != nil { s.Stop() } + return StartTask(isMaster) } // Once 涓�娆℃�т换鍔� -func Once() { +func Once(isMaster bool) { + if !isMaster { + return + } msg := &common.MsgPullDataRequest{DataType: common.PullDataTypeProcessModel} caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse) err := caller.Send(msg) diff --git a/main.go b/main.go index 8c631f7..74d4c67 100644 --- a/main.go +++ b/main.go @@ -50,11 +50,12 @@ logx.Errorf("nsq Init err:%v", err) return } - crontask.Once() - if err := crontask.StartTask(); err != nil { - logx.Errorf("crontab task Init err:%v", err) - return - } + + } + crontask.Once(agent.ClusterStatus != "slave") + if err := crontask.StartTask(agent.ClusterStatus != "slave"); err != nil { + logx.Errorf("crontab task Init err:%v", err) + return } logx.Infof("apsClient start serve...") @@ -70,18 +71,22 @@ func serfClusterEvent(stat int) { switch stat { case serf.EventCreateCluster, serf.EventSlave2Master, serf.EventLeaveCluster: - if err := nsq.Init(); err != nil { + if err := nsq.Init(); err != nil { //寮�鍚痭sq logx.Errorf("nsq Init err:%v", err) return } - crontask.Once() - if err := crontask.StartTask(); err != nil { + crontask.Once(true) + if err := crontask.RestartTask(true); err != nil { //浠aster鏂瑰紡閲嶅惎task logx.Errorf("crontab task Init err:%v", err) return } case serf.EventJoinCluster, serf.EventMaster2Slave: - nsq.Stop() - crontask.StopTask() + nsq.Stop() //鍏抽棴nsq + crontask.Once(false) + if err := crontask.RestartTask(false); err != nil { //浠ラ潪master鏂瑰紡閲嶅惎task + logx.Errorf("crontab task Init err:%v", err) + return + } } logx.Infof("serf cluster event: %v", stat) diff --git a/model/production_progress.go b/model/production_progress.go index bbdeb98..d928936 100644 --- a/model/production_progress.go +++ b/model/production_progress.go @@ -20,10 +20,11 @@ ProductionProgressSearch struct { ProductionProgress - Order string - PageNum int - PageSize int - Orm *gorm.DB + Order string + PageNum int + PageSize int + Orm *gorm.DB + UnFinished bool } ) @@ -78,6 +79,11 @@ return slf } +func (slf *ProductionProgressSearch) SetUnFinished() *ProductionProgressSearch { + slf.UnFinished = true + return slf +} + func (slf *ProductionProgressSearch) build() *gorm.DB { var db = slf.Orm.Model(&ProductionProgress{}) @@ -109,6 +115,10 @@ db = db.Where("channel = ?", slf.Channel) } + if slf.UnFinished { + db = db.Where("finished_quantity < total_quantity") + } + return db } diff --git a/service/progress.go b/service/progress.go index 7b6361a..ed904f1 100644 --- a/service/progress.go +++ b/service/progress.go @@ -1,6 +1,7 @@ package service import ( + "apsClient/conf" "apsClient/model" "errors" "github.com/jinzhu/gorm" @@ -54,7 +55,7 @@ var ok bool progressCache, ok = ProgressCacheGet(channel) if !ok { - progressCache, err = model.NewProductionProgressSearch(nil).SetChannel(channel).SetOrder("id desc").First() + progressCache, err = model.NewProductionProgressSearch(nil).SetDeviceId(conf.Conf.System.DeviceId).SetChannel(channel).SetOrder("id desc").First() if err == gorm.ErrRecordNotFound { return nil, errors.New("progress not found") } @@ -71,3 +72,9 @@ } return } + +// GetProgressList 鑾峰彇寰呭悓姝ヨ繘搴﹀伐搴� +func (slf ProgressService) GetProgressList() (progressList []*model.ProductionProgress, err error) { + progressList, err = model.NewProductionProgressSearch(nil).SetUnFinished().SetOrder("id desc").SetPage(1, 100).FindNotTotal() + return +} -- Gitblit v1.8.0