From f838162ed0ee7f2832924c2399eddd461760135a Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期五, 13 十月 2023 21:47:54 +0800
Subject: [PATCH] plc地址和工艺参数不再单独拉取,只serf主启动时拉取
---
constvar/const.go | 2
service/task.go | 26 ------
nsq/msg_handler.go | 63 +++++++++++++++
model/common/common.go | 16 ++++
main.go | 16 ++-
model/process_model.go | 10 ++
crontask/cron_task.go | 33 +++++++
nsq/consumer.go | 2
api/v1/task.go | 41 ----------
nsq/nsq.go | 11 --
10 files changed, 135 insertions(+), 85 deletions(-)
diff --git a/api/v1/task.go b/api/v1/task.go
index ce5b49d..96a0b1a 100644
--- a/api/v1/task.go
+++ b/api/v1/task.go
@@ -4,18 +4,14 @@
"apsClient/conf"
"apsClient/constvar"
"apsClient/model"
- "apsClient/model/common"
"apsClient/model/request"
"apsClient/model/response"
- "apsClient/nsq"
"apsClient/pkg/contextx"
"apsClient/pkg/ecode"
"apsClient/pkg/logx"
- "apsClient/pkg/safe"
"apsClient/service"
"apsClient/service/plc_address"
"errors"
- "fmt"
"github.com/gin-gonic/gin"
"github.com/jinzhu/gorm"
"github.com/spf13/cast"
@@ -179,15 +175,6 @@
})
}
- safe.Go(func() {
- caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId))
- var addressResult common.ResponsePlcAddress
- err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*3)
- if err != nil {
- logx.Infof("get plc address err: %v", err.Error())
- }
- })
-
resp := response.ProcessParamsResponse{
Number: processModel.Number,
Params: processParamsArr,
@@ -231,20 +218,6 @@
logx.Errorf("UpdateProcedureStatus err: %v", err.Error())
ctx.Fail(ecode.UnknownErr)
return
- }
-
- msg := &common.MsgTaskStatusUpdate{
- WorkOrderId: procedure.WorkOrderID,
- ProcedureID: procedure.ProceduresInfo.ProcedureID,
- DeviceId: procedure.ProceduresInfo.DeviceID,
- IsProcessing: false,
- IsFinish: true,
- }
-
- caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicTaskProcedureStatusUpdate, conf.Conf.NsqConf.NodeId), "")
- err = caller.Send(msg)
- if err != nil {
- logx.Errorf("send task status update msg error:%v", err.Error())
}
service.TaskFlagUnset(procedure.Channel)
@@ -331,20 +304,6 @@
if err != nil {
ctx.FailWithMsg(ecode.NeedConfirmedErr, "PLC璇锋眰澶辫触锛岃妫�鏌LC閰嶇疆")
return
- }
-
- msg := &common.MsgTaskStatusUpdate{
- WorkOrderId: procedure.WorkOrderID,
- ProcedureID: procedure.ProceduresInfo.ProcedureID,
- DeviceId: procedure.ProceduresInfo.DeviceID,
- IsProcessing: true,
- IsFinish: false,
- }
-
- caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicTaskProcedureStatusUpdate, conf.Conf.NsqConf.NodeId), "")
- err = caller.Send(msg)
- if err != nil {
- logx.Errorf("send task status update msg error:%v", err.Error())
}
service.TaskFlagSet(procedure.Channel)
ctx.Ok()
diff --git a/constvar/const.go b/constvar/const.go
index 6bea5e5..5a294e8 100644
--- a/constvar/const.go
+++ b/constvar/const.go
@@ -10,6 +10,8 @@
NsqTopicTaskProcedureStatusUpdate = "aps.%v.task.procedure.status" //宸ュ簭鐘舵�佹洿鏂�
NsqTopicSyncTaskProgress = "aps.%v.task.procedure.progress" //宸ュ簭鐢熶骇杩涘害
NsqTopicDeviceUpdate = "aps.%v.device.update" //璁惧淇℃伅鏇存敼
+ NsqTopicPullDataRequest = "aps.%v.pull.data.request" //鎷夊彇鏁版嵁璇锋眰
+ NsqTopicPullDataResponse = "aps.%v.pull.data.response" //鎷夊彇鏁版嵁鍝嶅簲
)
type PlcStartAddressType int
diff --git a/crontask/cron_task.go b/crontask/cron_task.go
index 9454dbe..e462c68 100644
--- a/crontask/cron_task.go
+++ b/crontask/cron_task.go
@@ -3,6 +3,7 @@
import (
"apsClient/conf"
"apsClient/constvar"
+ "apsClient/model/common"
"apsClient/nsq"
"apsClient/pkg/ecode"
"apsClient/pkg/logx"
@@ -13,8 +14,9 @@
"time"
)
-func InitTask() error {
+var s *gocron.Scheduler
+func StartTask() error {
finishNumberTimeInterval := conf.Conf.PLC.FinishNumberTimeInterval
totalNumberTimeInterval := conf.Conf.PLC.TotalNumberTimeInterval
if finishNumberTimeInterval == 0 {
@@ -23,7 +25,7 @@
if totalNumberTimeInterval == 0 {
totalNumberTimeInterval = 60
}
- s := gocron.NewScheduler(time.UTC)
+ s = gocron.NewScheduler(time.UTC)
_, err := s.Every(finishNumberTimeInterval).Seconds().Do(func() {
plcConfig, code := service.NewDevicePlcService().GetDevicePlc()
if code != ecode.OK {
@@ -73,7 +75,8 @@
})
- s.Every(180).Seconds().Do(SyncProductionProgress) //鍚屾鐢熶骇鏁版嵁
+ s.Every(60).Seconds().Do(SyncProductionProgress) //鍚屾鐢熶骇鏁版嵁
+ s.Every(60).Seconds().Do(SyncTaskStatus) //鍚屾浠诲姟鐘舵��
s.StartAsync()
return nil
}
@@ -104,5 +107,29 @@
logx.Errorf("SyncProductionProgress error:%v", err.Error())
}
}
+}
+func SyncTaskStatus() {
+ //todo
+}
+
+func StopTask() {
+ if s != nil {
+ s.Stop()
+ }
+}
+
+// Once 涓�娆℃�т换鍔�
+func Once() {
+ msg := &common.MsgPullDataRequest{DataType: common.PullDataTypeProcessModel}
+ caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse)
+ err := caller.Send(msg)
+ if err != nil {
+ logx.Errorf("send pull data msg error:%v", err.Error())
+ }
+ caller = nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), "")
+ err = caller.Send(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId})
+ if err != nil {
+ logx.Infof("get plc address err: %v", err.Error())
+ }
}
diff --git a/main.go b/main.go
index 569c9e7..bae163b 100644
--- a/main.go
+++ b/main.go
@@ -25,11 +25,6 @@
return
}
- if err := crontask.InitTask(); err != nil {
- logx.Errorf("crontab task Init err:%v", err)
- return
- }
-
//鍔犺浇plc鍐欏叆鍦板潃
plc_address.LoadAddressFromFile()
@@ -54,6 +49,11 @@
logx.Errorf("nsq Init err:%v", err)
return
}
+ crontask.Once()
+ if err := crontask.StartTask(); err != nil {
+ logx.Errorf("crontab task Init err:%v", err)
+ return
+ }
}
logx.Infof("apsClient start serve...")
@@ -73,8 +73,14 @@
logx.Errorf("nsq Init err:%v", err)
return
}
+ crontask.Once()
+ if err := crontask.StartTask(); err != nil {
+ logx.Errorf("crontab task Init err:%v", err)
+ return
+ }
case serf.EventJoinCluster, serf.EventMaster2Slave:
nsq.Stop()
+ crontask.StopTask()
}
logx.Infof("serf cluster event: %v", stat)
diff --git a/model/common/common.go b/model/common/common.go
index ead7e90..9df61af 100644
--- a/model/common/common.go
+++ b/model/common/common.go
@@ -137,3 +137,19 @@
ProcedureName string `gorm:"type:varchar(191);comment:宸ュ簭鍚嶇О" json:"procedureName"`
DeviceID string `gorm:"index;type:varchar(191);not null;comment:璁惧ID" json:"deviceId"`
}
+
+type PullDataType string
+
+const (
+ PullDataTypeProcessModel = "process_model"
+)
+
+// MsgPullDataRequest 鎷夊彇浜戠鏁版嵁
+type MsgPullDataRequest struct {
+ DataType PullDataType `json:"dataType"` //瑕佹媺鍙栫殑鏁版嵁绫诲瀷
+}
+
+type MsgPullDataResponse struct {
+ DataType PullDataType `json:"dataType"` //瑕佹媺鍙栫殑鏁版嵁绫诲瀷
+ Data interface{} //杩斿洖鐨勬暟鎹�
+}
diff --git a/model/process_model.go b/model/process_model.go
index bedda26..eeffa77 100644
--- a/model/process_model.go
+++ b/model/process_model.go
@@ -32,6 +32,7 @@
Orm *gorm.DB
Procedures []string
CurrentNumber string
+ Numbers []string
}
)
@@ -74,6 +75,11 @@
func (slf *ProcessModelSearch) SetNumber(number string) *ProcessModelSearch {
slf.Number = number
+ return slf
+}
+
+func (slf *ProcessModelSearch) SetNumbers(numbers []string) *ProcessModelSearch {
+ slf.Numbers = numbers
return slf
}
@@ -137,6 +143,10 @@
db = db.Where("number != ?", slf.CurrentNumber)
}
+ if len(slf.Numbers) != 0 {
+ db = db.Where("number in ?", slf.Numbers)
+ }
+
return db
}
diff --git a/nsq/consumer.go b/nsq/consumer.go
index 879f641..b4c3ba1 100644
--- a/nsq/consumer.go
+++ b/nsq/consumer.go
@@ -27,6 +27,8 @@
handler = &ProcessParamsSync{Topic: topic}
case fmt.Sprintf(constvar.NsqTopicDeviceUpdate, conf.Conf.NsqConf.NodeId):
handler = &DeviceUpdate{Topic: topic}
+ case fmt.Sprintf(constvar.NsqTopicPullDataResponse, conf.Conf.NsqConf.NodeId):
+ handler = &PullDataResponse{Topic: topic}
}
c.AddHandler(handler.HandleMessage)
diff --git a/nsq/msg_handler.go b/nsq/msg_handler.go
index a01d9ea..1db651e 100644
--- a/nsq/msg_handler.go
+++ b/nsq/msg_handler.go
@@ -275,3 +275,66 @@
return nil
}
+
+type PullDataResponse struct {
+ Topic string
+}
+
+func (slf *PullDataResponse) HandleMessage(data []byte) (err error) {
+ logx.Infof("get a pull data response message :%s", data)
+ var pullDataResponse common.MsgPullDataResponse
+ err = json.Unmarshal(data, &pullDataResponse)
+ if err != nil {
+ logx.Infof("unmarshal msg err :%s", err)
+ return err
+ }
+ switch pullDataResponse.DataType {
+ case common.PullDataTypeProcessModel:
+ err = slf.DealProcessModelData(pullDataResponse.Data)
+
+ }
+ if err != nil {
+ logx.Infof("process pull data err :%s", err)
+ return err
+ }
+ return nil
+}
+
+func (slf *PullDataResponse) DealProcessModelData(data interface{}) error {
+ var processModels []*model.ProcessModel
+ err := mapstructure.Decode(data, &processModels)
+ if err != nil {
+ return err
+ }
+ numbers := make([]string, 0, len(processModels))
+ for _, processModel := range processModels {
+ numbers = append(numbers, processModel.Number)
+ }
+ existsProcessModels, err := model.NewProcessModelSearch().SetIsNew(true).SetNumbers(numbers).FindNotTotal()
+ if err != nil {
+ return err
+ }
+
+ existsProcessModelsMap := make(map[string]struct{}, len(existsProcessModels))
+ for _, processModel := range existsProcessModels {
+ existsProcessModelsMap[processModel.Number] = struct{}{}
+ }
+
+ for _, processModel := range processModels {
+ if _, exists := existsProcessModelsMap[processModel.Number]; exists {
+ continue
+ }
+ err = model.WithTransaction(func(db *gorm.DB) error {
+ err = model.NewProcessModelSearch().SetOrm(db).SetProcedure(processModel.Procedure).SetProduct(processModel.Product).SetIsNew(true).UpdateByMap(map[string]interface{}{"is_new": 0})
+ if err != nil {
+ return err
+ }
+ processModel.IsNew = true
+ return model.NewProcessModelSearch().SetOrm(db).Create(processModel)
+ })
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/nsq/nsq.go b/nsq/nsq.go
index 064efa7..4fb71b6 100644
--- a/nsq/nsq.go
+++ b/nsq/nsq.go
@@ -3,7 +3,6 @@
import (
"apsClient/conf"
"apsClient/constvar"
- "apsClient/model/common"
"apsClient/pkg/logx"
"apsClient/pkg/safe"
"basic.com/aps/nsqclient.git"
@@ -11,7 +10,6 @@
"errors"
"fmt"
"sync"
- "time"
)
type consumerManager struct {
@@ -32,14 +30,6 @@
if err := initProducer(); err != nil {
return err
}
- safe.Go(func() {
- caller := NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId))
- var addressResult common.ResponsePlcAddress
- err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*3)
- if err != nil {
- logx.Infof("get plc address err: %v", err.Error())
- }
- })
var topics = []string{
constvar.NsqTopicScheduleTask,
@@ -47,6 +37,7 @@
constvar.NsqTopicProcessParamsResponse,
constvar.NsqTopicApsProcessParams,
constvar.NsqTopicDeviceUpdate,
+ constvar.NsqTopicPullDataResponse,
}
for _, t := range topics {
topic := fmt.Sprintf(t, conf.Conf.NsqConf.NodeId)
diff --git a/service/task.go b/service/task.go
index ed4c737..468509e 100644
--- a/service/task.go
+++ b/service/task.go
@@ -6,10 +6,8 @@
"apsClient/model"
"apsClient/model/common"
"apsClient/model/response"
- "apsClient/nsq"
"apsClient/pkg/ecode"
"apsClient/pkg/logx"
- "apsClient/pkg/structx"
"fmt"
"github.com/jinzhu/gorm"
"time"
@@ -176,30 +174,6 @@
}
if err == nil {
return data, nil
- }
-
- if err == gorm.ErrRecordNotFound { //濡傛灉鏁版嵁搴撴病鏈変粠浜戠鑾峰彇
- caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicProcessParamsRequest, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicProcessParamsResponse, conf.Conf.NsqConf.NodeId))
- var result common.ResponseProcessParams
- err = caller.Call(common.RequestProcessParams{
- WorkOrder: procedure.WorkOrderID,
- OrderId: procedure.OrderID,
- Product: order.ProductName,
- Procedure: procedure.ProceduresInfo.ProcedureName,
- Device: procedure.ProceduresInfo.DeviceName,
- DeviceId: conf.Conf.System.DeviceId,
- }, &result, time.Second*3)
- if err != nil {
- logx.Errorf("TaskStart GetProcessModel error:%v", err.Error())
- return
- }
- if result.ParamsMap == nil {
- logx.Errorf("TaskStart GetProcessModel response miss process params:%v", result)
- return
- }
- processModel = new(model.ProcessModel)
- err = structx.AssignTo(result, &processModel)
- return processModel, err
}
return
}
--
Gitblit v1.8.0