From 040cd381f2e8475e9b4eb336b704ad878f56f4fa Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期四, 14 九月 2023 20:42:01 +0800 Subject: [PATCH] 适应性改动 --- api/v1/plc.go | 4 service/plc.go | 47 +++++++++++++-- service/task.go | 6 +- nsq/msg_handler.go | 4 + service/cache_store.go | 24 ++++---- service/progress.go | 17 +++-- model/production_progress.go | 10 +++ crontask/cron_task.go | 59 +++++++++++++------ api/v1/task.go | 5 + 9 files changed, 125 insertions(+), 51 deletions(-) diff --git a/api/v1/plc.go b/api/v1/plc.go index 3ed79a8..a837e7a 100644 --- a/api/v1/plc.go +++ b/api/v1/plc.go @@ -29,8 +29,8 @@ if !ok { return } - finishNumber, _ := service.PlcCacheGet(constvar.PlcCacheKeyFinishNumber) - totalNumber, _ := service.PlcCacheGet(constvar.PlcCacheKeyTotalNumber) + finishNumber, _ := service.PlcCacheGet(params.Position, constvar.PlcCacheKeyFinishNumber) + totalNumber, _ := service.PlcCacheGet(params.Position, constvar.PlcCacheKeyTotalNumber) resp := new(response.ProductProgress) resp.FinishNumber = cast.ToInt(finishNumber) resp.TotalNumber = cast.ToInt(totalNumber) diff --git a/api/v1/task.go b/api/v1/task.go index 7eb2142..9da9503 100644 --- a/api/v1/task.go +++ b/api/v1/task.go @@ -146,7 +146,7 @@ ctx.Fail(code) return } - err := service.NewTaskService().UpdateProcedureStatus(nil, id, model.ProcedureStatusFinished) + err := service.NewTaskService().UpdateProcedureStatus(nil, id, model.ProcedureStatusFinished, procedure.Position) if err != nil { logx.Errorf("UpdateProcedureStatus err: %v", err.Error()) ctx.Fail(ecode.UnknownErr) @@ -211,6 +211,7 @@ if err != nil { return err } + procedure.Position = params.Position err = taskService.UpdateOrderStatus(db, order.ID, model.OrderStatusProcessing) if err != nil { return err @@ -239,7 +240,7 @@ return } plcConfig.CurrentTryTimes = 0 - err = service.PlcWrite(plcConfig, constvar.PlcStartAddressTypeTotalNumber, order.Amount.IntPart()) + err = service.PlcWrite(plcConfig, constvar.PlcStartAddressTypeTotalNumber, params.Position, order.Amount.IntPart()) if err != nil { ctx.FailWithMsg(ecode.NeedConfirmedErr, "绯熺硶锛屽伐鑹轰笅鍙戝け璐ャ��") return diff --git a/crontask/cron_task.go b/crontask/cron_task.go index 28301bf..505156d 100644 --- a/crontask/cron_task.go +++ b/crontask/cron_task.go @@ -29,12 +29,16 @@ if code != ecode.OK { return } - finishNumber, err := service.PlcRead(plcConfig, constvar.PlcStartAddressTypeFinishNumber) - if err == nil && finishNumber != 0 { - service.PlcCacheSet(constvar.PlcCacheKeyFinishNumber, finishNumber) - _ = service.NewProgressService().UpdateProgress(cast.ToInt64(finishNumber)) + for _, addressItem := range plcConfig.Details { + if addressItem.FieldName == constvar.PlcStartAddressTypeFinishNumber { + finishNumber, err := service.PlcReadDirect(plcConfig, addressItem.StartAddress, addressItem.Position) + if err == nil && finishNumber != 0 { + service.PlcCacheSet(addressItem.Position, constvar.PlcCacheKeyFinishNumber, finishNumber) + _ = service.NewProgressService().UpdateProgress(addressItem.Position, cast.ToInt64(finishNumber)) + } + logx.Infof("plc read finish number:%v, err:%v", finishNumber, err) + } } - logx.Infof("plc read finish number:%v, err:%v", finishNumber, err) }) if err != nil { @@ -46,11 +50,17 @@ if code != ecode.OK { return } - totalNumber, err := service.PlcRead(plcConfig, constvar.PlcStartAddressTypeTotalNumber) - if err == nil && totalNumber != 0 { - service.PlcCacheSet(constvar.PlcCacheKeyTotalNumber, totalNumber) + for _, addressItem := range plcConfig.Details { + if addressItem.FieldName == constvar.PlcStartAddressTypeTotalNumber { + totalNumber, err := service.PlcReadDirect(plcConfig, addressItem.StartAddress, addressItem.Position) + if err == nil && totalNumber != 0 { + service.PlcCacheSet(addressItem.Position, constvar.PlcCacheKeyTotalNumber, totalNumber) + _ = service.NewProgressService().UpdateProgress(addressItem.Position, cast.ToInt64(totalNumber)) + } + logx.Infof("plc read total number:%v, err:%v", totalNumber, err) + } } - logx.Infof("plc read total number:%v, err:%v", totalNumber, err) + }) s.Every(60).Seconds().StartImmediately().Do(SyncProductionProgress) //鍚屾鐢熶骇鏁版嵁 @@ -59,17 +69,30 @@ } func SyncProductionProgress() { - progress, err := service.NewProgressService().GetCurrentProgress() - if err != nil { + plcConfig, code := service.NewDevicePlcService().GetDevicePlc() + if code != ecode.OK { return } - if progress == nil { - return + var positions []int + for _, item := range plcConfig.Details { + if item.FieldName == constvar.PlcStartAddressTypeFinishNumber { + positions = append(positions, item.Position) + } + } + for _, position := range positions { + progress, err := service.NewProgressService().GetCurrentProgress(position) + if err != nil { + return + } + if progress == nil { + return + } + + caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicSyncTaskProgress, conf.Conf.NsqConf.NodeId), "") + err = caller.Send(progress) + if err != nil { + logx.Errorf("SyncProductionProgress error:%v", err.Error()) + } } - caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicSyncTaskProgress, conf.Conf.NsqConf.NodeId), "") - err = caller.Send(progress) - if err != nil { - logx.Errorf("SyncProductionProgress error:%v", err.Error()) - } } diff --git a/model/production_progress.go b/model/production_progress.go index 8305442..166769e 100644 --- a/model/production_progress.go +++ b/model/production_progress.go @@ -15,6 +15,7 @@ ProcedureID string `gorm:"uniqueIndex:idx_product_procedure;type:varchar(191);comment:宸ュ簭ID" json:"procedureId"` DeviceID string `gorm:"type:varchar(191);not null;comment:璁惧ID" json:"deviceId"` FinishedQuantity int64 `gorm:"type:int;not null;comment:瀹屾垚鏁伴噺" json:"finishedQuantity"` + Position int `gorm:"type:int;comment:宸ヤ綔浣嶇疆" json:"position"` //姣忎釜璁惧鍙兘鏈夊涓満浣嶅悓鏃剁敓浜э紝鐢╬osition琛ㄧず浣嶇疆 TotalQuantity int64 `gorm:"type:int;not null;comment:鎬婚噺" json:"totalQuantity"` } @@ -73,6 +74,11 @@ return slf } +func (slf *ProductionProgressSearch) SetPosition(position int) *ProductionProgressSearch { + slf.Position = position + return slf +} + func (slf *ProductionProgressSearch) build() *gorm.DB { var db = slf.Orm.Model(&ProductionProgress{}) @@ -100,6 +106,10 @@ db = db.Where("device_id = ?", slf.DeviceID) } + if slf.Position != 0 { + db = db.Where("position = ?", slf.Position) + } + return db } diff --git a/nsq/msg_handler.go b/nsq/msg_handler.go index 5237309..9d1b5bf 100644 --- a/nsq/msg_handler.go +++ b/nsq/msg_handler.go @@ -152,6 +152,10 @@ } } + if resp.PlcConfig.Method == "" { + return nil + } + var record model.DevicePlc err = mapstructure.Decode(resp.PlcConfig, &record) if err != nil { diff --git a/service/cache_store.go b/service/cache_store.go index 4c3148b..9cbd07b 100644 --- a/service/cache_store.go +++ b/service/cache_store.go @@ -44,17 +44,17 @@ } const ( - PlcCacheKey = "plc:%v" + PlcCacheKey = "plc:%v:%v" CurrentTaskCacheKey = "current_task" - CurrentProgressCacheKey = "current_progress" + CurrentProgressCacheKey = "current_progress:%v" ) -func PlcCacheGet(key string) (interface{}, bool) { - return defaultCacheStore.Get(fmt.Sprintf(PlcCacheKey, key)) +func PlcCacheGet(position int, key string) (interface{}, bool) { + return defaultCacheStore.Get(fmt.Sprintf(PlcCacheKey, position, key)) } -func PlcCacheSet(key string, value interface{}) { - defaultCacheStore.Add(fmt.Sprintf(PlcCacheKey, key), value) +func PlcCacheSet(position int, key string, value interface{}) { + defaultCacheStore.Add(fmt.Sprintf(PlcCacheKey, position, key), value) } func TaskCacheSet(value *response.TaskData) { @@ -72,17 +72,17 @@ return nil, false } -func ProgressCacheGet() (*model.ProductionProgress, bool) { - if v, ok := defaultCacheStore.Get(CurrentProgressCacheKey); ok { +func ProgressCacheGet(position int) (*model.ProductionProgress, bool) { + if v, ok := defaultCacheStore.Get(fmt.Sprintf(CurrentProgressCacheKey, position)); ok { return v.(*model.ProductionProgress), ok } return nil, false } -func ProgressCacheSet(value *model.ProductionProgress) { - defaultCacheStore.Add(CurrentProgressCacheKey, value) +func ProgressCacheSet(position int, value *model.ProductionProgress) { + defaultCacheStore.Add(fmt.Sprintf(CurrentProgressCacheKey, position), value) } -func ProgressCacheUnset() { - defaultCacheStore.Remove(CurrentProgressCacheKey) +func ProgressCacheUnset(position int) { + defaultCacheStore.Remove(fmt.Sprintf(CurrentProgressCacheKey, position)) } diff --git a/service/plc.go b/service/plc.go index f2bbd0b..0e5e49f 100644 --- a/service/plc.go +++ b/service/plc.go @@ -13,7 +13,7 @@ "github.com/spf13/cast" ) -func PlcRead(plcConfig *model.DevicePlc, fieldType constvar.PlcStartAddressType) (val interface{}, err error) { +func PlcRead(plcConfig *model.DevicePlc, fieldType constvar.PlcStartAddressType, position int) (val interface{}, err error) { var ( startAddress int valueType constvar.PlcStartAddressValueType @@ -22,7 +22,7 @@ ) for _, pc := range plcConfig.Details { - if pc.FieldName == fieldType { + if pc.FieldName == fieldType && pc.Position == position { startAddress = pc.StartAddress valueType = pc.Type dataLength = pc.Length @@ -66,7 +66,7 @@ return nil, errors.New("interface type not support") } -func PlcWrite(plcConfig *model.DevicePlc, fieldType constvar.PlcStartAddressType, value interface{}) (err error) { +func PlcWrite(plcConfig *model.DevicePlc, fieldType constvar.PlcStartAddressType, position int, value interface{}) (err error) { var ( startAddress int ipAddr string @@ -78,7 +78,7 @@ plcConfig.CurrentTryTimes++ for _, pc := range plcConfig.Details { - if pc.FieldName == fieldType { + if pc.FieldName == fieldType && pc.Position == position { startAddress = pc.StartAddress } } @@ -89,14 +89,14 @@ if err != nil { logx.Errorf("plc write failed, 杩炴帴plc澶辫触: %v", err.Error()) plcConfig.CurrentErr = err - return PlcWrite(plcConfig, fieldType, value) + return PlcWrite(plcConfig, fieldType, position, value) } result, err := plc.WriteHoldingRegister(conn, startAddress, value) if err != nil { logx.Errorf("plc write failed, address: %v, value: %v, err: %v", startAddress, value, err.Error()) plcConfig.CurrentErr = err - return PlcWrite(plcConfig, fieldType, value) + return PlcWrite(plcConfig, fieldType, position, value) } logx.Infof("plc write ok, address: %v, value: %v, result: %v", startAddress, value, result) } else if plcConfig.Method == constvar.PlcMethodSerial { @@ -144,3 +144,38 @@ } return } + +func PlcReadDirect(plcConfig *model.DevicePlc, address int, dataLength int) (val interface{}, err error) { + var ( + ipAddr string + ) + + if plcConfig.CurrentTryTimes > plcConfig.MaxTryTimes { + return nil, plcConfig.CurrentErr + } + plcConfig.CurrentTryTimes++ + if plcConfig.Method == constvar.PlcMethodModbusTCP { + ipAddr = fmt.Sprintf("%s:%v", plcConfig.Address, plcConfig.Port) + conn, err := plc.GetModbusConnection(ipAddr) + if err != nil { + logx.Errorf("plc write failed, 杩炴帴plc澶辫触: %v", err.Error()) + plcConfig.CurrentErr = err + return PlcReadDirect(plcConfig, address, dataLength) + } + value, err := plc.ReadHoldingRegister(conn, address, dataLength) + if err != nil { + logx.Errorf("plc read failed, address: %v, err: %v", address, err.Error()) + plcConfig.CurrentErr = err + return PlcReadDirect(plcConfig, address, dataLength) + } + logx.Infof("plc read ok, address: %v, result: %v", address, value) + } else if plcConfig.Method == constvar.PlcMethodSerial { + ipAddr = conf.Conf.Services.Serial + if ipAddr == "" { + return nil, errors.New("conf.Conf.Services.Serial config not set yet") + } + label := fmt.Sprintf("D%d", address) + return plccom.ReadPLC(plccom.DeviceTypeMitsubishi, ipAddr, label) + } + return +} diff --git a/service/progress.go b/service/progress.go index 5ebd638..972a248 100644 --- a/service/progress.go +++ b/service/progress.go @@ -20,17 +20,18 @@ ProcedureID: procedure.ProceduresInfo.ProcedureID, DeviceID: procedure.DeviceID, TotalQuantity: order.Amount.IntPart(), + Position: procedure.Position, } err := model.NewProductionProgressSearch(db).Create(progress) if err != nil { return err } - ProgressCacheSet(progress) + ProgressCacheSet(procedure.Position, progress) return nil } -func (slf ProgressService) UpdateProgress(finishedQuantity int64) (err error) { - progressCache, err := slf.GetCurrentProgress() +func (slf ProgressService) UpdateProgress(position int, finishedQuantity int64) (err error) { + progressCache, err := slf.GetCurrentProgress(position) if err != nil { return err } @@ -39,17 +40,17 @@ } if finishedQuantity > progressCache.FinishedQuantity { //褰撴湁鍙樺寲鏃舵墠鏇存柊 progressCache.FinishedQuantity = finishedQuantity - ProgressCacheSet(progressCache) + ProgressCacheSet(position, progressCache) return model.NewProductionProgressSearch(nil).SetId(progressCache.ID).Save(progressCache) } return nil } -func (slf ProgressService) GetCurrentProgress() (progressCache *model.ProductionProgress, err error) { +func (slf ProgressService) GetCurrentProgress(position int) (progressCache *model.ProductionProgress, err error) { var ok bool - progressCache, ok = ProgressCacheGet() + progressCache, ok = ProgressCacheGet(position) if !ok { - progressCache, err = model.NewProductionProgressSearch(nil).SetOrder("id desc").First() + progressCache, err = model.NewProductionProgressSearch(nil).SetPosition(position).SetOrder("id desc").First() if err == gorm.ErrRecordNotFound { return nil, errors.New("progress not found") } @@ -61,7 +62,7 @@ progressCache = nil } if progressCache != nil { - ProgressCacheSet(progressCache) + ProgressCacheSet(position, progressCache) } } return diff --git a/service/task.go b/service/task.go index 372c164..64a72df 100644 --- a/service/task.go +++ b/service/task.go @@ -116,7 +116,7 @@ func (slf TaskService) UpdateProcedureStatusAndPosition(db *gorm.DB, id int, status model.ProcedureStatus, position int) error { if status == model.ProcedureStatusFinished { - ProgressCacheUnset() + ProgressCacheUnset(position) } return model.NewProceduresSearch(db).SetId(id).UpdateByMap(map[string]interface{}{ "status": status, @@ -124,9 +124,9 @@ }) } -func (slf TaskService) UpdateProcedureStatus(db *gorm.DB, id int, status model.ProcedureStatus) error { +func (slf TaskService) UpdateProcedureStatus(db *gorm.DB, id int, status model.ProcedureStatus, position int) error { if status == model.ProcedureStatusFinished { - ProgressCacheUnset() + ProgressCacheUnset(position) } return model.NewProceduresSearch(db).SetId(id).UpdateByMap(map[string]interface{}{ "status": status, -- Gitblit v1.8.0