From 948bfeb2e8abd2ec80e282bd1b17975b89d3eb74 Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期四, 23 十一月 2023 11:27:52 +0800 Subject: [PATCH] 所有的缓存都考虑上deviceID --- api/v1/plc.go | 10 +- service/task.go | 3 service/cache_store.go | 94 ++++++++++++++++--------------- service/progress.go | 17 ++--- crontask/cron_task.go | 6 +- api/v1/task.go | 14 ++-- 6 files changed, 73 insertions(+), 71 deletions(-) diff --git a/api/v1/plc.go b/api/v1/plc.go index 650f64b..4de07fb 100644 --- a/api/v1/plc.go +++ b/api/v1/plc.go @@ -28,8 +28,8 @@ if !ok { return } - finishNumber, _ := service.PlcCacheGet(params.Channel, constvar.PlcCacheKeyFinishNumber) - totalNumber, _ := service.PlcCacheGet(params.Channel, constvar.PlcCacheKeyTotalNumber) + finishNumber, _ := service.PlcCacheGet(conf.Conf.CurrentDeviceID, params.Channel, constvar.PlcCacheKeyFinishNumber) + totalNumber, _ := service.PlcCacheGet(conf.Conf.CurrentDeviceID, params.Channel, constvar.PlcCacheKeyTotalNumber) resp := new(response.ProductProgress) resp.FinishNumber = cast.ToInt(finishNumber) resp.TotalNumber = cast.ToInt(totalNumber) @@ -37,7 +37,7 @@ plcStatus := 1 //鏂紑杩炴帴 isConnect := service.PlcIsConnect() if isConnect { - lastUpdateTime := service.FinishUpdateTimeGet(params.Channel) + lastUpdateTime := service.FinishUpdateTimeGet(conf.Conf.CurrentDeviceID, params.Channel) if time.Now().Unix()-cast.ToInt64(lastUpdateTime) < conf.Conf.PLC.StandbyTime { //鐢熶骇 plcStatus = 2 } else { @@ -46,8 +46,8 @@ } resp.PlcStatus = plcStatus - resp.RealStartTime = service.TaskStartTimeGet(params.Channel) - resp.RealEndTime = service.TaskEndTimeGet(params.Channel) + resp.RealStartTime = service.TaskStartTimeGet(conf.Conf.CurrentDeviceID, params.Channel) + resp.RealEndTime = service.TaskEndTimeGet(conf.Conf.CurrentDeviceID, params.Channel) ctx.OkWithDetailed(resp) } diff --git a/api/v1/task.go b/api/v1/task.go index 99db0e8..ec61f10 100644 --- a/api/v1/task.go +++ b/api/v1/task.go @@ -108,7 +108,7 @@ nowTs := time.Now().Unix() flagMap := make(map[int32]struct{}, 0) for _, task := range taskResponse.Tasks { - if _, ok := flagMap[task.Channel]; !ok && !service.TaskFlagGet(task.Channel) && task.Procedure.StartTime <= nowTs { + if _, ok := flagMap[task.Channel]; !ok && !service.TaskFlagGet(conf.Conf.CurrentDeviceID, task.Channel) && task.Procedure.StartTime <= nowTs { task.CanStarted = true flagMap[task.Channel] = struct{}{} } @@ -240,8 +240,8 @@ return } - service.TaskFlagUnset(procedure.Channel) - service.TaskEndTimeSet(procedure.Channel, time.Now().Unix()) //璁剧疆宸ュ簭杩愯缁撴潫鏃堕棿 + service.TaskFlagUnset(conf.Conf.CurrentDeviceID, procedure.Channel) + service.TaskEndTimeSet(conf.Conf.CurrentDeviceID, procedure.Channel, time.Now().Unix()) //璁剧疆宸ュ簭杩愯缁撴潫鏃堕棿 ctx.Ok() } @@ -340,9 +340,9 @@ return } - service.TaskFlagSet(procedure.Channel, int(procedure.ID)) - service.TaskStartTimeSet(procedure.Channel, time.Now().Unix()) //璁剧疆宸ュ簭杩愯寮�濮嬫椂闂� - service.TaskEndTimeSet(procedure.Channel, 0) //璁剧疆宸ュ簭杩愯缁撴潫鏃堕棿 + service.TaskFlagSet(conf.Conf.CurrentDeviceID, procedure.Channel, int(procedure.ID)) + service.TaskStartTimeSet(conf.Conf.CurrentDeviceID, procedure.Channel, time.Now().Unix()) //璁剧疆宸ュ簭杩愯寮�濮嬫椂闂� + service.TaskEndTimeSet(conf.Conf.CurrentDeviceID, procedure.Channel, 0) //璁剧疆宸ュ簭杩愯缁撴潫鏃堕棿 ctx.Ok() } @@ -501,7 +501,7 @@ nowTs := time.Now().Unix() flagMap := make(map[int32]struct{}, 0) for _, task := range taskResponse.Tasks { - if _, ok := flagMap[task.Channel]; !ok && !service.TaskFlagGet(task.Channel) && task.Procedure.StartTime <= nowTs { + if _, ok := flagMap[task.Channel]; !ok && !service.TaskFlagGet(conf.Conf.CurrentDeviceID, task.Channel) && task.Procedure.StartTime <= nowTs { task.CanStarted = true flagMap[task.Channel] = struct{}{} } diff --git a/crontask/cron_task.go b/crontask/cron_task.go index 7ae210c..2bd87c3 100644 --- a/crontask/cron_task.go +++ b/crontask/cron_task.go @@ -45,8 +45,8 @@ } finishNumber := cast.ToInt64(value) if finishNumber != 0 { - service.PlcCacheSet(addressItem.Channel, constvar.PlcCacheKeyFinishNumber, finishNumber) - _ = service.NewProgressService().UpdateProgress(addressItem.Channel, cast.ToInt64(finishNumber)) + service.PlcCacheSet(conf.Conf.CurrentDeviceID, addressItem.Channel, constvar.PlcCacheKeyFinishNumber, finishNumber) + _ = service.NewProgressService().UpdateProgress(conf.Conf.CurrentDeviceID, addressItem.Channel, cast.ToInt64(finishNumber)) } logx.Infof("plc read finish number: %v", finishNumber) } @@ -71,7 +71,7 @@ } totalNumber := cast.ToInt64(value) if totalNumber != 0 { - service.PlcCacheSet(addressItem.Channel, constvar.PlcCacheKeyTotalNumber, totalNumber) + service.PlcCacheSet(conf.Conf.CurrentDeviceID, addressItem.Channel, constvar.PlcCacheKeyTotalNumber, totalNumber) } logx.Infof("plc read total number: %v", totalNumber) } diff --git a/service/cache_store.go b/service/cache_store.go index 2ac4594..e8b9832 100644 --- a/service/cache_store.go +++ b/service/cache_store.go @@ -48,42 +48,48 @@ } const ( - PlcCacheKey = "plc:%v:%v" //plc:channel:key - CurrentTaskCacheKey = "current_task:%v" //current_task:channel - CurrentProgressCacheKey = "current_progress:%v" //current_progress:channel - PlcCacheKeyUpdateTime = "finish_number_update_time:%v" //finish_number_update_time:channel - TaskStartTimeCache = "task_start_time:%v" //task_start_time:channel - TaskEndTimeCache = "task_end_time:%v" //task_end_time:channel + PlcCacheKey = "plc:%v:%v:%v" //plc:deviceID:channel:key 缂撳瓨鍔犲伐鏁版垨鐩爣鏁� + CurrentTaskCacheKey = "current_task:%v:%v" //current_task:deviceID:channel 缂撳瓨褰撳墠浠诲姟id + CurrentProgressCacheKey = "current_progress:%v:%v" //current_progress:deviceId:channel + PlcCacheKeyUpdateTime = "finish_number_update_time:%v:%v" //finish_number_update_time:deviceID:channel + TaskStartTimeCache = "task_start_time:%v:%v" //task_start_time:deviceID:channel + TaskEndTimeCache = "task_end_time:%v:%v" //task_end_time:deviceID:channel ) -func PlcCacheGet(channel int32, key string) (interface{}, bool) { - return defaultCacheStore.Get(fmt.Sprintf(PlcCacheKey, channel, key)) +func PlcCacheGet(deviceId string, channel int32, key string) (interface{}, bool) { + return defaultCacheStore.Get(fmt.Sprintf(PlcCacheKey, deviceId, channel, key)) } -func PlcCacheSet(channel int32, key string, value interface{}) { +func PlcCacheSet(deviceId string, channel int32, key string, value interface{}) { if key == constvar.PlcCacheKeyFinishNumber { - oldFinishNumber, exists := PlcCacheGet(channel, key) + oldFinishNumber, exists := PlcCacheGet(deviceId, channel, key) if !exists || cast.ToInt(oldFinishNumber) != cast.ToInt(value) { //finishNumber鏈変簡鍙樺寲锛岃缃洿鏂版椂闂寸紦瀛� - FinishUpdateTimeSet(channel, time.Now().Unix()) + FinishUpdateTimeSet(deviceId, channel, time.Now().Unix()) } } - defaultCacheStore.Add(fmt.Sprintf(PlcCacheKey, channel, key), value) + defaultCacheStore.Add(fmt.Sprintf(PlcCacheKey, deviceId, channel, key), value) } -func FinishUpdateTimeGet(channel int32) interface{} { - val, ok := defaultCacheStore.Get(fmt.Sprintf(PlcCacheKeyUpdateTime, channel)) +// FinishUpdateTimeGet 鐢ㄤ簬鍒ゆ柇plc鐘舵�侊紝瓒呰繃澶氬皯鏃堕棿鏈洿鏂拌涓哄緟鏈� +func FinishUpdateTimeGet(deviceId string, channel int32) interface{} { + val, ok := defaultCacheStore.Get(fmt.Sprintf(PlcCacheKeyUpdateTime, deviceId, channel)) if ok { return val } return 0 } -func TaskStartTimeSet(channel int32, ts int64) { - defaultCacheStore.Add(fmt.Sprintf(TaskStartTimeCache, channel), ts) +func FinishUpdateTimeSet(deviceId string, channel int32, value interface{}) { + defaultCacheStore.Add(fmt.Sprintf(PlcCacheKeyUpdateTime, deviceId, channel), value) } -func TaskStartTimeGet(channel int32) int64 { - if v, ok := defaultCacheStore.Get(fmt.Sprintf(TaskStartTimeCache, channel)); ok { +func TaskStartTimeSet(deviceID string, channel int32, ts int64) { + defaultCacheStore.Add(fmt.Sprintf(TaskStartTimeCache, deviceID, channel), ts) +} + +// TaskStartTimeGet 鐢ㄤ簬鍓嶇灞曠ず宸ュ簭杩愯鏃堕棿 +func TaskStartTimeGet(deviceId string, channel int32) int64 { + if v, ok := defaultCacheStore.Get(fmt.Sprintf(TaskStartTimeCache, deviceId, channel)); ok { return v.(int64) } procedure, err := model.NewProceduresSearch(nil).SetDeviceId(conf.Conf.CurrentDeviceID).SetStatus(model.ProcedureStatusProcessing).SetChannels([]int32{channel}).First() //杩涜涓换鍔� @@ -91,69 +97,65 @@ procedure, err = model.NewProceduresSearch(nil).SetDeviceId(conf.Conf.CurrentDeviceID). SetStatus(model.ProcedureStatusFinished).SetChannels([]int32{channel}).SetOrder("real_end_time desc").First() //涓婁竴涓粨鏉熺殑浠诲姟 if err == gorm.ErrRecordNotFound { //杩涜涓拰缁撴潫鐨勯兘娌℃湁锛屽紑濮嬫椂闂村拰缁撴潫鏃堕棿閮借缃�0 - TaskStartTimeSet(channel, int64(0)) - TaskEndTimeSet(channel, int64(0)) + TaskStartTimeSet(deviceId, channel, int64(0)) + TaskEndTimeSet(deviceId, channel, int64(0)) return 0 } else { - TaskStartTimeSet(channel, procedure.RealStartTime) - TaskStartTimeSet(channel, procedure.RealEndTime) + TaskStartTimeSet(deviceId, channel, procedure.RealStartTime) + TaskEndTimeSet(deviceId, channel, procedure.RealEndTime) return 0 } } else { - TaskStartTimeSet(channel, procedure.RealStartTime) - TaskStartTimeSet(channel, int64(0)) + TaskStartTimeSet(deviceId, channel, procedure.RealStartTime) + TaskEndTimeSet(deviceId, channel, int64(0)) return procedure.RealStartTime } } -func TaskEndTimeSet(channel int32, ts int64) { - defaultCacheStore.Add(fmt.Sprintf(TaskEndTimeCache, channel), ts) +func TaskEndTimeSet(deviceID string, channel int32, ts int64) { + defaultCacheStore.Add(fmt.Sprintf(TaskEndTimeCache, deviceID, channel), ts) } -func TaskEndTimeGet(channel int32) int64 { - if v, ok := defaultCacheStore.Get(fmt.Sprintf(TaskEndTimeCache, channel)); ok { +func TaskEndTimeGet(deviceID string, channel int32) int64 { + if v, ok := defaultCacheStore.Get(fmt.Sprintf(TaskEndTimeCache, deviceID, channel)); ok { return v.(int64) } return 0 } -func FinishUpdateTimeSet(channel int32, value interface{}) { - defaultCacheStore.Add(fmt.Sprintf(PlcCacheKeyUpdateTime, channel), value) +func TaskFlagSet(deviceID string, channel int32, taskId int) { + defaultCacheStore.Add(fmt.Sprintf(CurrentTaskCacheKey, deviceID, channel), taskId) } -func TaskFlagSet(channel int32, taskId int) { - defaultCacheStore.Add(fmt.Sprintf(CurrentTaskCacheKey, channel), taskId) +func TaskFlagUnset(deviceID string, channel int32) { + defaultCacheStore.Add(fmt.Sprintf(CurrentTaskCacheKey, deviceID, channel), 0) } -func TaskFlagUnset(channel int32) { - defaultCacheStore.Add(fmt.Sprintf(CurrentTaskCacheKey, channel), 0) -} - -func TaskFlagGet(channel int32) bool { - if v, ok := defaultCacheStore.Get(fmt.Sprintf(CurrentTaskCacheKey, channel)); ok { +func TaskFlagGet(deviceID string, channel int32) bool { + if v, ok := defaultCacheStore.Get(fmt.Sprintf(CurrentTaskCacheKey, deviceID, channel)); ok { return v.(int) > 0 } procedure, err := model.NewProceduresSearch(nil).SetDeviceId(conf.Conf.CurrentDeviceID).SetStatus(model.ProcedureStatusProcessing).SetChannels([]int32{channel}).First() if err == gorm.ErrRecordNotFound { - defaultCacheStore.Add(fmt.Sprintf(CurrentTaskCacheKey, channel), 0) + defaultCacheStore.Add(fmt.Sprintf(CurrentTaskCacheKey, deviceID, channel), 0) return false } else { - defaultCacheStore.Add(fmt.Sprintf(CurrentTaskCacheKey, channel), int(procedure.ID)) + defaultCacheStore.Add(fmt.Sprintf(CurrentTaskCacheKey, deviceID, channel), int(procedure.ID)) return true } } -func ProgressCacheGet(channel int32) (*model.ProductionProgress, bool) { - if v, ok := defaultCacheStore.Get(fmt.Sprintf(CurrentProgressCacheKey, channel)); ok { +func ProgressCacheGet(deviceID string, channel int32) (*model.ProductionProgress, bool) { + if v, ok := defaultCacheStore.Get(fmt.Sprintf(CurrentProgressCacheKey, deviceID, channel)); ok { return v.(*model.ProductionProgress), ok } return nil, false } -func ProgressCacheSet(channel int32, value *model.ProductionProgress) { - defaultCacheStore.Add(fmt.Sprintf(CurrentProgressCacheKey, channel), value) +func ProgressCacheSet(deviceID string, channel int32, value *model.ProductionProgress) { + defaultCacheStore.Add(fmt.Sprintf(CurrentProgressCacheKey, deviceID, channel), value) } -func ProgressCacheUnset(channel int32) { - defaultCacheStore.Remove(fmt.Sprintf(CurrentProgressCacheKey, channel)) +func ProgressCacheUnset(deviceID string, channel int32) { + defaultCacheStore.Remove(fmt.Sprintf(CurrentProgressCacheKey, deviceID, channel)) } diff --git a/service/progress.go b/service/progress.go index 4b3dca5..0f18fb9 100644 --- a/service/progress.go +++ b/service/progress.go @@ -1,7 +1,6 @@ package service import ( - "apsClient/conf" "apsClient/model" "errors" "github.com/jinzhu/gorm" @@ -29,14 +28,14 @@ if err != nil { return err } - ProgressCacheSet(procedure.Channel, progress) + ProgressCacheSet(procedure.DeviceID, procedure.Channel, progress) } return nil } -func (slf ProgressService) UpdateProgress(channel int32, finishedQuantity int64) (err error) { - progressCache, err := slf.GetCurrentProgress(channel) +func (slf ProgressService) UpdateProgress(deviceID string, channel int32, finishedQuantity int64) (err error) { + progressCache, err := slf.GetCurrentProgress(deviceID, channel) if err != nil { return err } @@ -45,17 +44,17 @@ } if finishedQuantity > progressCache.FinishedQuantity { //褰撴湁鍙樺寲鏃舵墠鏇存柊 progressCache.FinishedQuantity = finishedQuantity - ProgressCacheSet(channel, progressCache) + ProgressCacheSet(deviceID, channel, progressCache) return model.NewProductionProgressSearch(nil).SetId(progressCache.ID).Save(progressCache) } return nil } -func (slf ProgressService) GetCurrentProgress(channel int32) (progressCache *model.ProductionProgress, err error) { +func (slf ProgressService) GetCurrentProgress(deviceID string, channel int32) (progressCache *model.ProductionProgress, err error) { var ok bool - progressCache, ok = ProgressCacheGet(channel) + progressCache, ok = ProgressCacheGet(deviceID, channel) if !ok { - progressCache, err = model.NewProductionProgressSearch(nil).SetDeviceId(conf.Conf.CurrentDeviceID).SetChannel(channel).SetOrder("id asc").First() + progressCache, err = model.NewProductionProgressSearch(nil).SetDeviceId(deviceID).SetChannel(channel).SetOrder("id asc").First() if err == gorm.ErrRecordNotFound { return nil, errors.New("progress not found") } @@ -67,7 +66,7 @@ progressCache = nil } if progressCache != nil { - ProgressCacheSet(channel, progressCache) + ProgressCacheSet(deviceID, channel, progressCache) } } return diff --git a/service/task.go b/service/task.go index d393b5f..323add1 100644 --- a/service/task.go +++ b/service/task.go @@ -1,6 +1,7 @@ package service import ( + "apsClient/conf" "apsClient/constvar" "apsClient/model" "apsClient/model/common" @@ -133,7 +134,7 @@ func (slf TaskService) UpdateProcedureStatusAndChannel(db *gorm.DB, id uint, status model.ProcedureStatus, channel int32, processModelNumber string) error { if status == model.ProcedureStatusFinished || status == model.ProcedureStatusWaitProcess { - ProgressCacheUnset(channel) + ProgressCacheUnset(conf.Conf.CurrentDeviceID, channel) } upMap := map[string]interface{}{"status": status} -- Gitblit v1.8.0