From 040cd381f2e8475e9b4eb336b704ad878f56f4fa Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期四, 14 九月 2023 20:42:01 +0800
Subject: [PATCH] 适应性改动
---
api/v1/plc.go | 4
service/plc.go | 47 +++++++++++++--
service/task.go | 6 +-
nsq/msg_handler.go | 4 +
service/cache_store.go | 24 ++++----
service/progress.go | 17 +++--
model/production_progress.go | 10 +++
crontask/cron_task.go | 59 +++++++++++++------
api/v1/task.go | 5 +
9 files changed, 125 insertions(+), 51 deletions(-)
diff --git a/api/v1/plc.go b/api/v1/plc.go
index 3ed79a8..a837e7a 100644
--- a/api/v1/plc.go
+++ b/api/v1/plc.go
@@ -29,8 +29,8 @@
if !ok {
return
}
- finishNumber, _ := service.PlcCacheGet(constvar.PlcCacheKeyFinishNumber)
- totalNumber, _ := service.PlcCacheGet(constvar.PlcCacheKeyTotalNumber)
+ finishNumber, _ := service.PlcCacheGet(params.Position, constvar.PlcCacheKeyFinishNumber)
+ totalNumber, _ := service.PlcCacheGet(params.Position, constvar.PlcCacheKeyTotalNumber)
resp := new(response.ProductProgress)
resp.FinishNumber = cast.ToInt(finishNumber)
resp.TotalNumber = cast.ToInt(totalNumber)
diff --git a/api/v1/task.go b/api/v1/task.go
index 7eb2142..9da9503 100644
--- a/api/v1/task.go
+++ b/api/v1/task.go
@@ -146,7 +146,7 @@
ctx.Fail(code)
return
}
- err := service.NewTaskService().UpdateProcedureStatus(nil, id, model.ProcedureStatusFinished)
+ err := service.NewTaskService().UpdateProcedureStatus(nil, id, model.ProcedureStatusFinished, procedure.Position)
if err != nil {
logx.Errorf("UpdateProcedureStatus err: %v", err.Error())
ctx.Fail(ecode.UnknownErr)
@@ -211,6 +211,7 @@
if err != nil {
return err
}
+ procedure.Position = params.Position
err = taskService.UpdateOrderStatus(db, order.ID, model.OrderStatusProcessing)
if err != nil {
return err
@@ -239,7 +240,7 @@
return
}
plcConfig.CurrentTryTimes = 0
- err = service.PlcWrite(plcConfig, constvar.PlcStartAddressTypeTotalNumber, order.Amount.IntPart())
+ err = service.PlcWrite(plcConfig, constvar.PlcStartAddressTypeTotalNumber, params.Position, order.Amount.IntPart())
if err != nil {
ctx.FailWithMsg(ecode.NeedConfirmedErr, "绯熺硶锛屽伐鑹轰笅鍙戝け璐ャ��")
return
diff --git a/crontask/cron_task.go b/crontask/cron_task.go
index 28301bf..505156d 100644
--- a/crontask/cron_task.go
+++ b/crontask/cron_task.go
@@ -29,12 +29,16 @@
if code != ecode.OK {
return
}
- finishNumber, err := service.PlcRead(plcConfig, constvar.PlcStartAddressTypeFinishNumber)
- if err == nil && finishNumber != 0 {
- service.PlcCacheSet(constvar.PlcCacheKeyFinishNumber, finishNumber)
- _ = service.NewProgressService().UpdateProgress(cast.ToInt64(finishNumber))
+ for _, addressItem := range plcConfig.Details {
+ if addressItem.FieldName == constvar.PlcStartAddressTypeFinishNumber {
+ finishNumber, err := service.PlcReadDirect(plcConfig, addressItem.StartAddress, addressItem.Position)
+ if err == nil && finishNumber != 0 {
+ service.PlcCacheSet(addressItem.Position, constvar.PlcCacheKeyFinishNumber, finishNumber)
+ _ = service.NewProgressService().UpdateProgress(addressItem.Position, cast.ToInt64(finishNumber))
+ }
+ logx.Infof("plc read finish number:%v, err:%v", finishNumber, err)
+ }
}
- logx.Infof("plc read finish number:%v, err:%v", finishNumber, err)
})
if err != nil {
@@ -46,11 +50,17 @@
if code != ecode.OK {
return
}
- totalNumber, err := service.PlcRead(plcConfig, constvar.PlcStartAddressTypeTotalNumber)
- if err == nil && totalNumber != 0 {
- service.PlcCacheSet(constvar.PlcCacheKeyTotalNumber, totalNumber)
+ for _, addressItem := range plcConfig.Details {
+ if addressItem.FieldName == constvar.PlcStartAddressTypeTotalNumber {
+ totalNumber, err := service.PlcReadDirect(plcConfig, addressItem.StartAddress, addressItem.Position)
+ if err == nil && totalNumber != 0 {
+ service.PlcCacheSet(addressItem.Position, constvar.PlcCacheKeyTotalNumber, totalNumber)
+ _ = service.NewProgressService().UpdateProgress(addressItem.Position, cast.ToInt64(totalNumber))
+ }
+ logx.Infof("plc read total number:%v, err:%v", totalNumber, err)
+ }
}
- logx.Infof("plc read total number:%v, err:%v", totalNumber, err)
+
})
s.Every(60).Seconds().StartImmediately().Do(SyncProductionProgress) //鍚屾鐢熶骇鏁版嵁
@@ -59,17 +69,30 @@
}
func SyncProductionProgress() {
- progress, err := service.NewProgressService().GetCurrentProgress()
- if err != nil {
+ plcConfig, code := service.NewDevicePlcService().GetDevicePlc()
+ if code != ecode.OK {
return
}
- if progress == nil {
- return
+ var positions []int
+ for _, item := range plcConfig.Details {
+ if item.FieldName == constvar.PlcStartAddressTypeFinishNumber {
+ positions = append(positions, item.Position)
+ }
+ }
+ for _, position := range positions {
+ progress, err := service.NewProgressService().GetCurrentProgress(position)
+ if err != nil {
+ return
+ }
+ if progress == nil {
+ return
+ }
+
+ caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicSyncTaskProgress, conf.Conf.NsqConf.NodeId), "")
+ err = caller.Send(progress)
+ if err != nil {
+ logx.Errorf("SyncProductionProgress error:%v", err.Error())
+ }
}
- caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicSyncTaskProgress, conf.Conf.NsqConf.NodeId), "")
- err = caller.Send(progress)
- if err != nil {
- logx.Errorf("SyncProductionProgress error:%v", err.Error())
- }
}
diff --git a/model/production_progress.go b/model/production_progress.go
index 8305442..166769e 100644
--- a/model/production_progress.go
+++ b/model/production_progress.go
@@ -15,6 +15,7 @@
ProcedureID string `gorm:"uniqueIndex:idx_product_procedure;type:varchar(191);comment:宸ュ簭ID" json:"procedureId"`
DeviceID string `gorm:"type:varchar(191);not null;comment:璁惧ID" json:"deviceId"`
FinishedQuantity int64 `gorm:"type:int;not null;comment:瀹屾垚鏁伴噺" json:"finishedQuantity"`
+ Position int `gorm:"type:int;comment:宸ヤ綔浣嶇疆" json:"position"` //姣忎釜璁惧鍙兘鏈夊涓満浣嶅悓鏃剁敓浜э紝鐢╬osition琛ㄧず浣嶇疆
TotalQuantity int64 `gorm:"type:int;not null;comment:鎬婚噺" json:"totalQuantity"`
}
@@ -73,6 +74,11 @@
return slf
}
+func (slf *ProductionProgressSearch) SetPosition(position int) *ProductionProgressSearch {
+ slf.Position = position
+ return slf
+}
+
func (slf *ProductionProgressSearch) build() *gorm.DB {
var db = slf.Orm.Model(&ProductionProgress{})
@@ -100,6 +106,10 @@
db = db.Where("device_id = ?", slf.DeviceID)
}
+ if slf.Position != 0 {
+ db = db.Where("position = ?", slf.Position)
+ }
+
return db
}
diff --git a/nsq/msg_handler.go b/nsq/msg_handler.go
index 5237309..9d1b5bf 100644
--- a/nsq/msg_handler.go
+++ b/nsq/msg_handler.go
@@ -152,6 +152,10 @@
}
}
+ if resp.PlcConfig.Method == "" {
+ return nil
+ }
+
var record model.DevicePlc
err = mapstructure.Decode(resp.PlcConfig, &record)
if err != nil {
diff --git a/service/cache_store.go b/service/cache_store.go
index 4c3148b..9cbd07b 100644
--- a/service/cache_store.go
+++ b/service/cache_store.go
@@ -44,17 +44,17 @@
}
const (
- PlcCacheKey = "plc:%v"
+ PlcCacheKey = "plc:%v:%v"
CurrentTaskCacheKey = "current_task"
- CurrentProgressCacheKey = "current_progress"
+ CurrentProgressCacheKey = "current_progress:%v"
)
-func PlcCacheGet(key string) (interface{}, bool) {
- return defaultCacheStore.Get(fmt.Sprintf(PlcCacheKey, key))
+func PlcCacheGet(position int, key string) (interface{}, bool) {
+ return defaultCacheStore.Get(fmt.Sprintf(PlcCacheKey, position, key))
}
-func PlcCacheSet(key string, value interface{}) {
- defaultCacheStore.Add(fmt.Sprintf(PlcCacheKey, key), value)
+func PlcCacheSet(position int, key string, value interface{}) {
+ defaultCacheStore.Add(fmt.Sprintf(PlcCacheKey, position, key), value)
}
func TaskCacheSet(value *response.TaskData) {
@@ -72,17 +72,17 @@
return nil, false
}
-func ProgressCacheGet() (*model.ProductionProgress, bool) {
- if v, ok := defaultCacheStore.Get(CurrentProgressCacheKey); ok {
+func ProgressCacheGet(position int) (*model.ProductionProgress, bool) {
+ if v, ok := defaultCacheStore.Get(fmt.Sprintf(CurrentProgressCacheKey, position)); ok {
return v.(*model.ProductionProgress), ok
}
return nil, false
}
-func ProgressCacheSet(value *model.ProductionProgress) {
- defaultCacheStore.Add(CurrentProgressCacheKey, value)
+func ProgressCacheSet(position int, value *model.ProductionProgress) {
+ defaultCacheStore.Add(fmt.Sprintf(CurrentProgressCacheKey, position), value)
}
-func ProgressCacheUnset() {
- defaultCacheStore.Remove(CurrentProgressCacheKey)
+func ProgressCacheUnset(position int) {
+ defaultCacheStore.Remove(fmt.Sprintf(CurrentProgressCacheKey, position))
}
diff --git a/service/plc.go b/service/plc.go
index f2bbd0b..0e5e49f 100644
--- a/service/plc.go
+++ b/service/plc.go
@@ -13,7 +13,7 @@
"github.com/spf13/cast"
)
-func PlcRead(plcConfig *model.DevicePlc, fieldType constvar.PlcStartAddressType) (val interface{}, err error) {
+func PlcRead(plcConfig *model.DevicePlc, fieldType constvar.PlcStartAddressType, position int) (val interface{}, err error) {
var (
startAddress int
valueType constvar.PlcStartAddressValueType
@@ -22,7 +22,7 @@
)
for _, pc := range plcConfig.Details {
- if pc.FieldName == fieldType {
+ if pc.FieldName == fieldType && pc.Position == position {
startAddress = pc.StartAddress
valueType = pc.Type
dataLength = pc.Length
@@ -66,7 +66,7 @@
return nil, errors.New("interface type not support")
}
-func PlcWrite(plcConfig *model.DevicePlc, fieldType constvar.PlcStartAddressType, value interface{}) (err error) {
+func PlcWrite(plcConfig *model.DevicePlc, fieldType constvar.PlcStartAddressType, position int, value interface{}) (err error) {
var (
startAddress int
ipAddr string
@@ -78,7 +78,7 @@
plcConfig.CurrentTryTimes++
for _, pc := range plcConfig.Details {
- if pc.FieldName == fieldType {
+ if pc.FieldName == fieldType && pc.Position == position {
startAddress = pc.StartAddress
}
}
@@ -89,14 +89,14 @@
if err != nil {
logx.Errorf("plc write failed, 杩炴帴plc澶辫触: %v", err.Error())
plcConfig.CurrentErr = err
- return PlcWrite(plcConfig, fieldType, value)
+ return PlcWrite(plcConfig, fieldType, position, value)
}
result, err := plc.WriteHoldingRegister(conn, startAddress, value)
if err != nil {
logx.Errorf("plc write failed, address: %v, value: %v, err: %v", startAddress, value, err.Error())
plcConfig.CurrentErr = err
- return PlcWrite(plcConfig, fieldType, value)
+ return PlcWrite(plcConfig, fieldType, position, value)
}
logx.Infof("plc write ok, address: %v, value: %v, result: %v", startAddress, value, result)
} else if plcConfig.Method == constvar.PlcMethodSerial {
@@ -144,3 +144,38 @@
}
return
}
+
+func PlcReadDirect(plcConfig *model.DevicePlc, address int, dataLength int) (val interface{}, err error) {
+ var (
+ ipAddr string
+ )
+
+ if plcConfig.CurrentTryTimes > plcConfig.MaxTryTimes {
+ return nil, plcConfig.CurrentErr
+ }
+ plcConfig.CurrentTryTimes++
+ if plcConfig.Method == constvar.PlcMethodModbusTCP {
+ ipAddr = fmt.Sprintf("%s:%v", plcConfig.Address, plcConfig.Port)
+ conn, err := plc.GetModbusConnection(ipAddr)
+ if err != nil {
+ logx.Errorf("plc write failed, 杩炴帴plc澶辫触: %v", err.Error())
+ plcConfig.CurrentErr = err
+ return PlcReadDirect(plcConfig, address, dataLength)
+ }
+ value, err := plc.ReadHoldingRegister(conn, address, dataLength)
+ if err != nil {
+ logx.Errorf("plc read failed, address: %v, err: %v", address, err.Error())
+ plcConfig.CurrentErr = err
+ return PlcReadDirect(plcConfig, address, dataLength)
+ }
+ logx.Infof("plc read ok, address: %v, result: %v", address, value)
+ } else if plcConfig.Method == constvar.PlcMethodSerial {
+ ipAddr = conf.Conf.Services.Serial
+ if ipAddr == "" {
+ return nil, errors.New("conf.Conf.Services.Serial config not set yet")
+ }
+ label := fmt.Sprintf("D%d", address)
+ return plccom.ReadPLC(plccom.DeviceTypeMitsubishi, ipAddr, label)
+ }
+ return
+}
diff --git a/service/progress.go b/service/progress.go
index 5ebd638..972a248 100644
--- a/service/progress.go
+++ b/service/progress.go
@@ -20,17 +20,18 @@
ProcedureID: procedure.ProceduresInfo.ProcedureID,
DeviceID: procedure.DeviceID,
TotalQuantity: order.Amount.IntPart(),
+ Position: procedure.Position,
}
err := model.NewProductionProgressSearch(db).Create(progress)
if err != nil {
return err
}
- ProgressCacheSet(progress)
+ ProgressCacheSet(procedure.Position, progress)
return nil
}
-func (slf ProgressService) UpdateProgress(finishedQuantity int64) (err error) {
- progressCache, err := slf.GetCurrentProgress()
+func (slf ProgressService) UpdateProgress(position int, finishedQuantity int64) (err error) {
+ progressCache, err := slf.GetCurrentProgress(position)
if err != nil {
return err
}
@@ -39,17 +40,17 @@
}
if finishedQuantity > progressCache.FinishedQuantity { //褰撴湁鍙樺寲鏃舵墠鏇存柊
progressCache.FinishedQuantity = finishedQuantity
- ProgressCacheSet(progressCache)
+ ProgressCacheSet(position, progressCache)
return model.NewProductionProgressSearch(nil).SetId(progressCache.ID).Save(progressCache)
}
return nil
}
-func (slf ProgressService) GetCurrentProgress() (progressCache *model.ProductionProgress, err error) {
+func (slf ProgressService) GetCurrentProgress(position int) (progressCache *model.ProductionProgress, err error) {
var ok bool
- progressCache, ok = ProgressCacheGet()
+ progressCache, ok = ProgressCacheGet(position)
if !ok {
- progressCache, err = model.NewProductionProgressSearch(nil).SetOrder("id desc").First()
+ progressCache, err = model.NewProductionProgressSearch(nil).SetPosition(position).SetOrder("id desc").First()
if err == gorm.ErrRecordNotFound {
return nil, errors.New("progress not found")
}
@@ -61,7 +62,7 @@
progressCache = nil
}
if progressCache != nil {
- ProgressCacheSet(progressCache)
+ ProgressCacheSet(position, progressCache)
}
}
return
diff --git a/service/task.go b/service/task.go
index 372c164..64a72df 100644
--- a/service/task.go
+++ b/service/task.go
@@ -116,7 +116,7 @@
func (slf TaskService) UpdateProcedureStatusAndPosition(db *gorm.DB, id int, status model.ProcedureStatus, position int) error {
if status == model.ProcedureStatusFinished {
- ProgressCacheUnset()
+ ProgressCacheUnset(position)
}
return model.NewProceduresSearch(db).SetId(id).UpdateByMap(map[string]interface{}{
"status": status,
@@ -124,9 +124,9 @@
})
}
-func (slf TaskService) UpdateProcedureStatus(db *gorm.DB, id int, status model.ProcedureStatus) error {
+func (slf TaskService) UpdateProcedureStatus(db *gorm.DB, id int, status model.ProcedureStatus, position int) error {
if status == model.ProcedureStatusFinished {
- ProgressCacheUnset()
+ ProgressCacheUnset(position)
}
return model.NewProceduresSearch(db).SetId(id).UpdateByMap(map[string]interface{}{
"status": status,
--
Gitblit v1.8.0