From f838162ed0ee7f2832924c2399eddd461760135a Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期五, 13 十月 2023 21:47:54 +0800 Subject: [PATCH] plc地址和工艺参数不再单独拉取,只serf主启动时拉取 --- constvar/const.go | 2 service/task.go | 26 ------ nsq/msg_handler.go | 63 +++++++++++++++ model/common/common.go | 16 ++++ main.go | 16 ++- model/process_model.go | 10 ++ crontask/cron_task.go | 33 +++++++ nsq/consumer.go | 2 api/v1/task.go | 41 ---------- nsq/nsq.go | 11 -- 10 files changed, 135 insertions(+), 85 deletions(-) diff --git a/api/v1/task.go b/api/v1/task.go index ce5b49d..96a0b1a 100644 --- a/api/v1/task.go +++ b/api/v1/task.go @@ -4,18 +4,14 @@ "apsClient/conf" "apsClient/constvar" "apsClient/model" - "apsClient/model/common" "apsClient/model/request" "apsClient/model/response" - "apsClient/nsq" "apsClient/pkg/contextx" "apsClient/pkg/ecode" "apsClient/pkg/logx" - "apsClient/pkg/safe" "apsClient/service" "apsClient/service/plc_address" "errors" - "fmt" "github.com/gin-gonic/gin" "github.com/jinzhu/gorm" "github.com/spf13/cast" @@ -179,15 +175,6 @@ }) } - safe.Go(func() { - caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId)) - var addressResult common.ResponsePlcAddress - err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*3) - if err != nil { - logx.Infof("get plc address err: %v", err.Error()) - } - }) - resp := response.ProcessParamsResponse{ Number: processModel.Number, Params: processParamsArr, @@ -231,20 +218,6 @@ logx.Errorf("UpdateProcedureStatus err: %v", err.Error()) ctx.Fail(ecode.UnknownErr) return - } - - msg := &common.MsgTaskStatusUpdate{ - WorkOrderId: procedure.WorkOrderID, - ProcedureID: procedure.ProceduresInfo.ProcedureID, - DeviceId: procedure.ProceduresInfo.DeviceID, - IsProcessing: false, - IsFinish: true, - } - - caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicTaskProcedureStatusUpdate, conf.Conf.NsqConf.NodeId), "") - err = caller.Send(msg) - if err != nil { - logx.Errorf("send task status update msg error:%v", err.Error()) } service.TaskFlagUnset(procedure.Channel) @@ -331,20 +304,6 @@ if err != nil { ctx.FailWithMsg(ecode.NeedConfirmedErr, "PLC璇锋眰澶辫触锛岃妫�鏌LC閰嶇疆") return - } - - msg := &common.MsgTaskStatusUpdate{ - WorkOrderId: procedure.WorkOrderID, - ProcedureID: procedure.ProceduresInfo.ProcedureID, - DeviceId: procedure.ProceduresInfo.DeviceID, - IsProcessing: true, - IsFinish: false, - } - - caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicTaskProcedureStatusUpdate, conf.Conf.NsqConf.NodeId), "") - err = caller.Send(msg) - if err != nil { - logx.Errorf("send task status update msg error:%v", err.Error()) } service.TaskFlagSet(procedure.Channel) ctx.Ok() diff --git a/constvar/const.go b/constvar/const.go index 6bea5e5..5a294e8 100644 --- a/constvar/const.go +++ b/constvar/const.go @@ -10,6 +10,8 @@ NsqTopicTaskProcedureStatusUpdate = "aps.%v.task.procedure.status" //宸ュ簭鐘舵�佹洿鏂� NsqTopicSyncTaskProgress = "aps.%v.task.procedure.progress" //宸ュ簭鐢熶骇杩涘害 NsqTopicDeviceUpdate = "aps.%v.device.update" //璁惧淇℃伅鏇存敼 + NsqTopicPullDataRequest = "aps.%v.pull.data.request" //鎷夊彇鏁版嵁璇锋眰 + NsqTopicPullDataResponse = "aps.%v.pull.data.response" //鎷夊彇鏁版嵁鍝嶅簲 ) type PlcStartAddressType int diff --git a/crontask/cron_task.go b/crontask/cron_task.go index 9454dbe..e462c68 100644 --- a/crontask/cron_task.go +++ b/crontask/cron_task.go @@ -3,6 +3,7 @@ import ( "apsClient/conf" "apsClient/constvar" + "apsClient/model/common" "apsClient/nsq" "apsClient/pkg/ecode" "apsClient/pkg/logx" @@ -13,8 +14,9 @@ "time" ) -func InitTask() error { +var s *gocron.Scheduler +func StartTask() error { finishNumberTimeInterval := conf.Conf.PLC.FinishNumberTimeInterval totalNumberTimeInterval := conf.Conf.PLC.TotalNumberTimeInterval if finishNumberTimeInterval == 0 { @@ -23,7 +25,7 @@ if totalNumberTimeInterval == 0 { totalNumberTimeInterval = 60 } - s := gocron.NewScheduler(time.UTC) + s = gocron.NewScheduler(time.UTC) _, err := s.Every(finishNumberTimeInterval).Seconds().Do(func() { plcConfig, code := service.NewDevicePlcService().GetDevicePlc() if code != ecode.OK { @@ -73,7 +75,8 @@ }) - s.Every(180).Seconds().Do(SyncProductionProgress) //鍚屾鐢熶骇鏁版嵁 + s.Every(60).Seconds().Do(SyncProductionProgress) //鍚屾鐢熶骇鏁版嵁 + s.Every(60).Seconds().Do(SyncTaskStatus) //鍚屾浠诲姟鐘舵�� s.StartAsync() return nil } @@ -104,5 +107,29 @@ logx.Errorf("SyncProductionProgress error:%v", err.Error()) } } +} +func SyncTaskStatus() { + //todo +} + +func StopTask() { + if s != nil { + s.Stop() + } +} + +// Once 涓�娆℃�т换鍔� +func Once() { + msg := &common.MsgPullDataRequest{DataType: common.PullDataTypeProcessModel} + caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse) + err := caller.Send(msg) + if err != nil { + logx.Errorf("send pull data msg error:%v", err.Error()) + } + caller = nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), "") + err = caller.Send(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}) + if err != nil { + logx.Infof("get plc address err: %v", err.Error()) + } } diff --git a/main.go b/main.go index 569c9e7..bae163b 100644 --- a/main.go +++ b/main.go @@ -25,11 +25,6 @@ return } - if err := crontask.InitTask(); err != nil { - logx.Errorf("crontab task Init err:%v", err) - return - } - //鍔犺浇plc鍐欏叆鍦板潃 plc_address.LoadAddressFromFile() @@ -54,6 +49,11 @@ logx.Errorf("nsq Init err:%v", err) return } + crontask.Once() + if err := crontask.StartTask(); err != nil { + logx.Errorf("crontab task Init err:%v", err) + return + } } logx.Infof("apsClient start serve...") @@ -73,8 +73,14 @@ logx.Errorf("nsq Init err:%v", err) return } + crontask.Once() + if err := crontask.StartTask(); err != nil { + logx.Errorf("crontab task Init err:%v", err) + return + } case serf.EventJoinCluster, serf.EventMaster2Slave: nsq.Stop() + crontask.StopTask() } logx.Infof("serf cluster event: %v", stat) diff --git a/model/common/common.go b/model/common/common.go index ead7e90..9df61af 100644 --- a/model/common/common.go +++ b/model/common/common.go @@ -137,3 +137,19 @@ ProcedureName string `gorm:"type:varchar(191);comment:宸ュ簭鍚嶇О" json:"procedureName"` DeviceID string `gorm:"index;type:varchar(191);not null;comment:璁惧ID" json:"deviceId"` } + +type PullDataType string + +const ( + PullDataTypeProcessModel = "process_model" +) + +// MsgPullDataRequest 鎷夊彇浜戠鏁版嵁 +type MsgPullDataRequest struct { + DataType PullDataType `json:"dataType"` //瑕佹媺鍙栫殑鏁版嵁绫诲瀷 +} + +type MsgPullDataResponse struct { + DataType PullDataType `json:"dataType"` //瑕佹媺鍙栫殑鏁版嵁绫诲瀷 + Data interface{} //杩斿洖鐨勬暟鎹� +} diff --git a/model/process_model.go b/model/process_model.go index bedda26..eeffa77 100644 --- a/model/process_model.go +++ b/model/process_model.go @@ -32,6 +32,7 @@ Orm *gorm.DB Procedures []string CurrentNumber string + Numbers []string } ) @@ -74,6 +75,11 @@ func (slf *ProcessModelSearch) SetNumber(number string) *ProcessModelSearch { slf.Number = number + return slf +} + +func (slf *ProcessModelSearch) SetNumbers(numbers []string) *ProcessModelSearch { + slf.Numbers = numbers return slf } @@ -137,6 +143,10 @@ db = db.Where("number != ?", slf.CurrentNumber) } + if len(slf.Numbers) != 0 { + db = db.Where("number in ?", slf.Numbers) + } + return db } diff --git a/nsq/consumer.go b/nsq/consumer.go index 879f641..b4c3ba1 100644 --- a/nsq/consumer.go +++ b/nsq/consumer.go @@ -27,6 +27,8 @@ handler = &ProcessParamsSync{Topic: topic} case fmt.Sprintf(constvar.NsqTopicDeviceUpdate, conf.Conf.NsqConf.NodeId): handler = &DeviceUpdate{Topic: topic} + case fmt.Sprintf(constvar.NsqTopicPullDataResponse, conf.Conf.NsqConf.NodeId): + handler = &PullDataResponse{Topic: topic} } c.AddHandler(handler.HandleMessage) diff --git a/nsq/msg_handler.go b/nsq/msg_handler.go index a01d9ea..1db651e 100644 --- a/nsq/msg_handler.go +++ b/nsq/msg_handler.go @@ -275,3 +275,66 @@ return nil } + +type PullDataResponse struct { + Topic string +} + +func (slf *PullDataResponse) HandleMessage(data []byte) (err error) { + logx.Infof("get a pull data response message :%s", data) + var pullDataResponse common.MsgPullDataResponse + err = json.Unmarshal(data, &pullDataResponse) + if err != nil { + logx.Infof("unmarshal msg err :%s", err) + return err + } + switch pullDataResponse.DataType { + case common.PullDataTypeProcessModel: + err = slf.DealProcessModelData(pullDataResponse.Data) + + } + if err != nil { + logx.Infof("process pull data err :%s", err) + return err + } + return nil +} + +func (slf *PullDataResponse) DealProcessModelData(data interface{}) error { + var processModels []*model.ProcessModel + err := mapstructure.Decode(data, &processModels) + if err != nil { + return err + } + numbers := make([]string, 0, len(processModels)) + for _, processModel := range processModels { + numbers = append(numbers, processModel.Number) + } + existsProcessModels, err := model.NewProcessModelSearch().SetIsNew(true).SetNumbers(numbers).FindNotTotal() + if err != nil { + return err + } + + existsProcessModelsMap := make(map[string]struct{}, len(existsProcessModels)) + for _, processModel := range existsProcessModels { + existsProcessModelsMap[processModel.Number] = struct{}{} + } + + for _, processModel := range processModels { + if _, exists := existsProcessModelsMap[processModel.Number]; exists { + continue + } + err = model.WithTransaction(func(db *gorm.DB) error { + err = model.NewProcessModelSearch().SetOrm(db).SetProcedure(processModel.Procedure).SetProduct(processModel.Product).SetIsNew(true).UpdateByMap(map[string]interface{}{"is_new": 0}) + if err != nil { + return err + } + processModel.IsNew = true + return model.NewProcessModelSearch().SetOrm(db).Create(processModel) + }) + if err != nil { + return err + } + } + return nil +} diff --git a/nsq/nsq.go b/nsq/nsq.go index 064efa7..4fb71b6 100644 --- a/nsq/nsq.go +++ b/nsq/nsq.go @@ -3,7 +3,6 @@ import ( "apsClient/conf" "apsClient/constvar" - "apsClient/model/common" "apsClient/pkg/logx" "apsClient/pkg/safe" "basic.com/aps/nsqclient.git" @@ -11,7 +10,6 @@ "errors" "fmt" "sync" - "time" ) type consumerManager struct { @@ -32,14 +30,6 @@ if err := initProducer(); err != nil { return err } - safe.Go(func() { - caller := NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId)) - var addressResult common.ResponsePlcAddress - err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*3) - if err != nil { - logx.Infof("get plc address err: %v", err.Error()) - } - }) var topics = []string{ constvar.NsqTopicScheduleTask, @@ -47,6 +37,7 @@ constvar.NsqTopicProcessParamsResponse, constvar.NsqTopicApsProcessParams, constvar.NsqTopicDeviceUpdate, + constvar.NsqTopicPullDataResponse, } for _, t := range topics { topic := fmt.Sprintf(t, conf.Conf.NsqConf.NodeId) diff --git a/service/task.go b/service/task.go index ed4c737..468509e 100644 --- a/service/task.go +++ b/service/task.go @@ -6,10 +6,8 @@ "apsClient/model" "apsClient/model/common" "apsClient/model/response" - "apsClient/nsq" "apsClient/pkg/ecode" "apsClient/pkg/logx" - "apsClient/pkg/structx" "fmt" "github.com/jinzhu/gorm" "time" @@ -176,30 +174,6 @@ } if err == nil { return data, nil - } - - if err == gorm.ErrRecordNotFound { //濡傛灉鏁版嵁搴撴病鏈変粠浜戠鑾峰彇 - caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicProcessParamsRequest, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicProcessParamsResponse, conf.Conf.NsqConf.NodeId)) - var result common.ResponseProcessParams - err = caller.Call(common.RequestProcessParams{ - WorkOrder: procedure.WorkOrderID, - OrderId: procedure.OrderID, - Product: order.ProductName, - Procedure: procedure.ProceduresInfo.ProcedureName, - Device: procedure.ProceduresInfo.DeviceName, - DeviceId: conf.Conf.System.DeviceId, - }, &result, time.Second*3) - if err != nil { - logx.Errorf("TaskStart GetProcessModel error:%v", err.Error()) - return - } - if result.ParamsMap == nil { - logx.Errorf("TaskStart GetProcessModel response miss process params:%v", result) - return - } - processModel = new(model.ProcessModel) - err = structx.AssignTo(result, &processModel) - return processModel, err } return } -- Gitblit v1.8.0