From 49e90e5de2e7166e74e26102dff9064b933fc5fd Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期二, 05 九月 2023 16:00:27 +0800 Subject: [PATCH] 工序的生产进度存表 --- service/plc.go | 39 ---- service/task.go | 15 + nsq/msg_handler.go | 17 + service/cache_store.go | 88 +++++++++ service/progress.go | 56 ++++++ model/index.go | 1 model/production_progress.go | 257 ++++++++++++++++++++++++++++ main.go | 5 model/work_order.go | 2 crontask/cron_task.go | 2 api/v1/task.go | 19 +- nsq/nsq.go | 2 12 files changed, 443 insertions(+), 60 deletions(-) diff --git a/api/v1/task.go b/api/v1/task.go index 1729bc7..cfd6ce1 100644 --- a/api/v1/task.go +++ b/api/v1/task.go @@ -35,7 +35,7 @@ return } var resp response.TaskCountdown - workOrder, err := service.NewTaskService().GetCurrentTask() + workOrder, err := service.NewTaskService().GetNextTask() if err == nil { seconds := workOrder.StartTime - time.Now().Unix() resp.CountDownHour = seconds / 3600 @@ -180,12 +180,13 @@ return } id := convertx.Atoi(idx) - procedure, code := service.NewTaskService().GetProcedureById(id) + taskService := service.NewTaskService() + procedure, code := taskService.GetProcedureById(id) if code != ecode.OK { ctx.Fail(code) return } - order, err := service.NewTaskService().GetOrderByWorkOrderId(procedure.WorkOrderID) + order, err := taskService.GetOrderByWorkOrderId(procedure.WorkOrderID) if err != nil { ctx.Fail(ecode.UnknownErr) return @@ -196,22 +197,22 @@ return } - processModel, err := service.NewTaskService().GetProcessParams(procedure, order) + processModel, err := taskService.GetProcessParams(procedure, order) if err != nil || processModel == nil || processModel.ParamsMap == nil { - ctx.Fail(ecode.UnknownErr) - return + //ctx.Fail(ecode.UnknownErr) //todo + //return } err = model.WithTransaction(func(db *gorm.DB) error { - err = service.NewTaskService().UpdateProcedureStatus(db, id, model.ProcedureStatusProcessing) + err = taskService.UpdateProcedureStatus(db, id, model.ProcedureStatusProcessing) if err != nil { return err } - err = service.NewTaskService().UpdateOrderStatus(db, order.ID, model.OrderStatusProcessing) + err = taskService.UpdateOrderStatus(db, order.ID, model.OrderStatusProcessing) if err != nil { return err } - return nil + return service.NewProgressService().AddProgress(db, procedure, order) }) if err != nil { logx.Errorf("SendProcessParams update order and procedure status error:%v", err.Error()) diff --git a/crontask/cron_task.go b/crontask/cron_task.go index 0ad2210..35fde2b 100644 --- a/crontask/cron_task.go +++ b/crontask/cron_task.go @@ -7,6 +7,7 @@ "apsClient/pkg/logx" "apsClient/service" "github.com/go-co-op/gocron" + "github.com/spf13/cast" "time" ) @@ -29,6 +30,7 @@ finishNumber, err := service.PlcRead(plcConfig, constvar.PlcStartAddressTypeFinishNumber) if err == nil && finishNumber != 0 { service.PlcCacheSet(constvar.PlcCacheKeyFinishNumber, finishNumber) + _ = service.NewProgressService().UpdateProgress(cast.ToInt64(finishNumber)) } logx.Infof("plc read finish number:%v, err:%v", finishNumber, err) diff --git a/main.go b/main.go index 927364e..386843c 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,7 @@ "apsClient/nsq" "apsClient/pkg/logx" "apsClient/router" + "apsClient/service" "apsClient/service/plc_address" "fmt" "net/http" @@ -37,6 +38,10 @@ } //鍔犺浇plc鍐欏叆鍦板潃 plc_address.LoadAddressFromFile() + + //鎻愬墠鍔犺浇浠诲姟 + service.NewTaskService().GetTask() + go shutdown() logx.Infof("apsClient start serve...") server := &http.Server{ diff --git a/model/index.go b/model/index.go index 7828517..738e2fc 100644 --- a/model/index.go +++ b/model/index.go @@ -28,6 +28,7 @@ PlcBrand{}, DevicePlc{}, ProcessModel{}, + ProductionProgress{}, ) return err } diff --git a/model/production_progress.go b/model/production_progress.go new file mode 100644 index 0000000..8305442 --- /dev/null +++ b/model/production_progress.go @@ -0,0 +1,257 @@ +package model + +import ( + "apsClient/pkg/sqlitex" + "fmt" + "gorm.io/gorm" +) + +type ( + ProductionProgress struct { + gorm.Model `json:"-"` + ID int `gorm:"primarykey"` + WorkOrderID string `gorm:"index;type:varchar(191);not null;comment:宸ュ崟ID" json:"workOrderID"` + OrderID string `gorm:"index;type:varchar(191);not null;comment:璁㈠崟ID" json:"orderID"` + ProcedureID string `gorm:"uniqueIndex:idx_product_procedure;type:varchar(191);comment:宸ュ簭ID" json:"procedureId"` + DeviceID string `gorm:"type:varchar(191);not null;comment:璁惧ID" json:"deviceId"` + FinishedQuantity int64 `gorm:"type:int;not null;comment:瀹屾垚鏁伴噺" json:"finishedQuantity"` + TotalQuantity int64 `gorm:"type:int;not null;comment:鎬婚噺" json:"totalQuantity"` + } + + ProductionProgressSearch struct { + ProductionProgress + Order string + PageNum int + PageSize int + Orm *gorm.DB + } +) + +func (slf *ProductionProgress) TableName() string { + return "production_progress" +} + +func NewProductionProgressSearch(db *gorm.DB) *ProductionProgressSearch { + if db == nil { + db = sqlitex.GetDB() + } + return &ProductionProgressSearch{Orm: db} +} + +func (slf *ProductionProgressSearch) SetOrm(tx *gorm.DB) *ProductionProgressSearch { + slf.Orm = tx + return slf +} + +func (slf *ProductionProgressSearch) SetPage(page, size int) *ProductionProgressSearch { + slf.PageNum, slf.PageSize = page, size + return slf +} + +func (slf *ProductionProgressSearch) SetOrder(order string) *ProductionProgressSearch { + slf.Order = order + return slf +} + +func (slf *ProductionProgressSearch) SetWorkOrderId(orderId string) *ProductionProgressSearch { + slf.WorkOrderID = orderId + return slf +} + +func (slf *ProductionProgressSearch) SetProcedureId(procedureId string) *ProductionProgressSearch { + slf.ProcedureID = procedureId + return slf +} + +func (slf *ProductionProgressSearch) SetDeviceId(id string) *ProductionProgressSearch { + slf.DeviceID = id + return slf +} + +func (slf *ProductionProgressSearch) SetId(id int) *ProductionProgressSearch { + slf.ID = id + return slf +} + +func (slf *ProductionProgressSearch) build() *gorm.DB { + var db = slf.Orm.Model(&ProductionProgress{}) + + if slf.Order != "" { + db = db.Order(slf.Order) + } + + if slf.ID != 0 { + db = db.Where("id = ?", slf.ID) + } + + if slf.WorkOrderID != "" { + db = db.Where("work_order_id = ?", slf.WorkOrderID) + } + + if slf.OrderID != "" { + db = db.Where("order_id = ?", slf.OrderID) + } + + if slf.ProcedureID != "" { + db = db.Where("procedure_id = ?", slf.ProcedureID) + } + + if slf.DeviceID != "" { + db = db.Where("device_id = ?", slf.DeviceID) + } + + return db +} + +// Create 鍗曟潯鎻掑叆 +func (slf *ProductionProgressSearch) Create(record *ProductionProgress) error { + var db = slf.build() + + if err := db.Create(record).Error; err != nil { + return fmt.Errorf("create err: %v, record: %+v", err, record) + } + + return nil +} + +// CreateBatch 鎵归噺鎻掑叆 +func (slf *ProductionProgressSearch) CreateBatch(records []*ProductionProgress) error { + var db = slf.build() + + if err := db.Create(&records).Error; err != nil { + return fmt.Errorf("create batch err: %v, records: %+v", err, records) + } + + return nil +} + +func (slf *ProductionProgressSearch) Save(record *ProductionProgress) error { + var db = slf.build() + + if err := db.Save(record).Error; err != nil { + return fmt.Errorf("save err: %v, record: %+v", err, record) + } + + return nil +} + +func (slf *ProductionProgressSearch) UpdateByMap(upMap map[string]interface{}) error { + var ( + db = slf.build() + ) + + if err := db.Updates(upMap).Error; err != nil { + return fmt.Errorf("update by map err: %v, upMap: %+v", err, upMap) + } + + return nil +} + +func (slf *ProductionProgressSearch) UpdateByQuery(query string, args []interface{}, upMap map[string]interface{}) error { + var ( + db = slf.Orm.Table(slf.TableName()).Where(query, args...) + ) + + if err := db.Updates(upMap).Error; err != nil { + return fmt.Errorf("update by query err: %v, query: %s, args: %+v, upMap: %+v", err, query, args, upMap) + } + + return nil +} + +func (slf *ProductionProgressSearch) Delete() error { + var db = slf.build() + + if err := db.Unscoped().Delete(&ProductionProgress{}).Error; err != nil { + return err + } + + return nil +} + +func (slf *ProductionProgressSearch) First() (*ProductionProgress, error) { + var ( + record = new(ProductionProgress) + db = slf.build() + ) + + if err := db.First(record).Error; err != nil { + return record, err + } + + return record, nil +} + +func (slf *ProductionProgressSearch) Find() ([]*ProductionProgress, int64, error) { + var ( + records = make([]*ProductionProgress, 0) + total int64 + db = slf.build() + ) + + if err := db.Count(&total).Error; err != nil { + return records, total, fmt.Errorf("find count err: %v", err) + } + if slf.PageNum*slf.PageSize > 0 { + db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) + } + if err := db.Find(&records).Error; err != nil { + return records, total, fmt.Errorf("find records err: %v", err) + } + + return records, total, nil +} + +func (slf *ProductionProgressSearch) FindNotTotal() ([]*ProductionProgress, error) { + var ( + records = make([]*ProductionProgress, 0) + db = slf.build() + ) + + if slf.PageNum*slf.PageSize > 0 { + db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) + } + if err := db.Find(&records).Error; err != nil { + return records, fmt.Errorf("find records err: %v", err) + } + + return records, nil +} + +// FindByQuery 鎸囧畾鏉′欢鏌ヨ. +func (slf *ProductionProgressSearch) FindByQuery(query string, args []interface{}) ([]*ProductionProgress, int64, error) { + var ( + records = make([]*ProductionProgress, 0) + total int64 + db = slf.Orm.Table(slf.TableName()).Where(query, args...) + ) + + if err := db.Count(&total).Error; err != nil { + return records, total, fmt.Errorf("find by query count err: %v", err) + } + if slf.PageNum*slf.PageSize > 0 { + db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) + } + if err := db.Find(&records).Error; err != nil { + return records, total, fmt.Errorf("find by query records err: %v, query: %s, args: %+v", err, query, args) + } + + return records, total, nil +} + +// FindByQueryNotTotal 鎸囧畾鏉′欢鏌ヨ&涓嶆煡璇㈡�绘潯鏁�. +func (slf *ProductionProgressSearch) FindByQueryNotTotal(query string, args []interface{}) ([]*ProductionProgress, error) { + var ( + records = make([]*ProductionProgress, 0) + db = slf.Orm.Table(slf.TableName()).Where(query, args...) + ) + + if slf.PageNum*slf.PageSize > 0 { + db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) + } + if err := db.Find(&records).Error; err != nil { + return records, fmt.Errorf("find by query records err: %v, query: %s, args: %+v", err, query, args) + } + + return records, nil +} diff --git a/model/work_order.go b/model/work_order.go index b453d81..c6567b3 100644 --- a/model/work_order.go +++ b/model/work_order.go @@ -47,7 +47,7 @@ ) func (slf *Order) TableName() string { - return "Order" + return "work_order" } func NewOrderSearch(db *gorm.DB) *OrderSearch { diff --git a/nsq/msg_handler.go b/nsq/msg_handler.go index 129d390..c5d5bf9 100644 --- a/nsq/msg_handler.go +++ b/nsq/msg_handler.go @@ -102,7 +102,15 @@ logx.Errorf("ScheduleTask HandleMessage Unmarshal json err: %v", err.Error()) return nil } - if len(resp.KeyData) == 0 || len(resp.AddressData) == 0 || resp.DeviceId != conf.Conf.System.DeviceId { + if resp.DeviceId != conf.Conf.System.DeviceId { + return nil + } + //閫氱煡鍥炲鏀跺埌 + ReceivedMessageChan <- &ReceivedMessage{ + Topic: slf.Topic, + Message: data, + } + if len(resp.KeyData) == 0 || len(resp.AddressData) == 0 { return nil } //鍐欏叆鍒版枃浠� @@ -129,11 +137,6 @@ address := cast.ToInt(strings.ReplaceAll(addresses[i], "\r", "")) plc_address.Set(key, address) logx.Infof("plc address set ok: key:%v, address:%v", key, address) - } - //閫氱煡鍥炲鏀跺埌 - ReceivedMessageChan <- &ReceivedMessage{ - Topic: slf.Topic, - Message: data, } return nil } @@ -179,7 +182,7 @@ logx.Infof("unmarshal process params sync err :%s", err) return err } - err = model.NewProcessModelSearch().Create(&processModel) + err = model.NewProcessModelSearch().Save(&processModel) if err != nil { logx.Infof("save process params sync err :%s", err) return err diff --git a/nsq/nsq.go b/nsq/nsq.go index 79d792b..0bd6bf6 100644 --- a/nsq/nsq.go +++ b/nsq/nsq.go @@ -23,7 +23,7 @@ safe.Go(func() { caller := NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId)) var addressResult common.ResponsePlcAddress - err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*2) + err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*3) if err != nil { logx.Infof("get plc address err: %v", err.Error()) } diff --git a/service/cache_store.go b/service/cache_store.go new file mode 100644 index 0000000..4c3148b --- /dev/null +++ b/service/cache_store.go @@ -0,0 +1,88 @@ +package service + +import ( + "apsClient/model" + "apsClient/model/response" + "fmt" + "sync" +) + +type CacheStore struct { + cache map[string]interface{} + mu sync.Mutex +} + +var defaultCacheStore *CacheStore + +func init() { + defaultCacheStore = newCacheManager() +} +func newCacheManager() *CacheStore { + return &CacheStore{ + cache: make(map[string]interface{}), + } +} + +func (cm *CacheStore) Get(key string) (interface{}, bool) { + cm.mu.Lock() + defer cm.mu.Unlock() + + conn, ok := cm.cache[key] + return conn, ok +} + +func (cm *CacheStore) Add(key string, value interface{}) { + cm.mu.Lock() + defer cm.mu.Unlock() + cm.cache[key] = value +} + +func (cm *CacheStore) Remove(key string) { + cm.mu.Lock() + defer cm.mu.Unlock() + delete(cm.cache, key) +} + +const ( + PlcCacheKey = "plc:%v" + CurrentTaskCacheKey = "current_task" + CurrentProgressCacheKey = "current_progress" +) + +func PlcCacheGet(key string) (interface{}, bool) { + return defaultCacheStore.Get(fmt.Sprintf(PlcCacheKey, key)) +} + +func PlcCacheSet(key string, value interface{}) { + defaultCacheStore.Add(fmt.Sprintf(PlcCacheKey, key), value) +} + +func TaskCacheSet(value *response.TaskData) { + defaultCacheStore.Add(CurrentTaskCacheKey, value) +} + +func TaskCacheUnset() { + defaultCacheStore.Remove(CurrentTaskCacheKey) +} + +func TaskCacheGet() (*response.TaskData, bool) { + if v, ok := defaultCacheStore.Get(CurrentTaskCacheKey); ok { + return v.(*response.TaskData), ok + } + return nil, false +} + +func ProgressCacheGet() (*model.ProductionProgress, bool) { + if v, ok := defaultCacheStore.Get(CurrentProgressCacheKey); ok { + return v.(*model.ProductionProgress), ok + } + return nil, false +} + +func ProgressCacheSet(value *model.ProductionProgress) { + defaultCacheStore.Add(CurrentProgressCacheKey, value) +} + +func ProgressCacheUnset() { + defaultCacheStore.Remove(CurrentProgressCacheKey) +} diff --git a/service/plc.go b/service/plc.go index 74ed5dc..090b4bc 100644 --- a/service/plc.go +++ b/service/plc.go @@ -8,7 +8,6 @@ "encoding/binary" "errors" "fmt" - "sync" ) func PlcRead(plcConfig *model.DevicePlc, fieldType constvar.PlcStartAddressType) (val interface{}, err error) { @@ -85,42 +84,4 @@ } logx.Infof("plc write ok, address: %v, value: %v, result: %v", startAddress, value, result) return -} - -type CacheStore struct { - cache map[string]interface{} - mu sync.Mutex -} - -var defaultCacheStore *CacheStore - -func init() { - defaultCacheStore = newCacheManager() -} -func newCacheManager() *CacheStore { - return &CacheStore{ - cache: make(map[string]interface{}), - } -} - -func (cm *CacheStore) Get(key string) (interface{}, bool) { - cm.mu.Lock() - defer cm.mu.Unlock() - - conn, ok := cm.cache[key] - return conn, ok -} - -func (cm *CacheStore) Add(key string, value interface{}) { - cm.mu.Lock() - defer cm.mu.Unlock() - cm.cache[key] = value -} - -func PlcCacheGet(key string) (interface{}, bool) { - return defaultCacheStore.Get(key) -} - -func PlcCacheSet(key string, value interface{}) { - defaultCacheStore.Add(key, value) } diff --git a/service/progress.go b/service/progress.go new file mode 100644 index 0000000..bb0d859 --- /dev/null +++ b/service/progress.go @@ -0,0 +1,56 @@ +package service + +import ( + "apsClient/model" + "errors" + "gorm.io/gorm" +) + +type ProgressService struct { +} + +func NewProgressService() *ProgressService { + return &ProgressService{} +} + +func (slf ProgressService) AddProgress(db *gorm.DB, procedure *model.Procedures, order *model.Order) error { + progress := &model.ProductionProgress{ + WorkOrderID: procedure.WorkOrderID, + OrderID: procedure.OrderID, + ProcedureID: procedure.ProceduresInfo.ProcedureID, + DeviceID: procedure.DeviceID, + TotalQuantity: order.Amount.IntPart(), + } + err := model.NewProductionProgressSearch(db).Create(progress) + if err != nil { + return err + } + ProgressCacheSet(progress) + return nil +} + +func (slf ProgressService) UpdateProgress(finishedQuantity int64) (err error) { + var progressCache *model.ProductionProgress + progressCache, ok := ProgressCacheGet() + if !ok { + progressCache, err = model.NewProductionProgressSearch(nil).SetOrder("id desc").First() + if err == gorm.ErrRecordNotFound { + return errors.New("progress cache not found") + } + if err != nil { + return err + } + if progressCache.FinishedQuantity < progressCache.TotalQuantity { + ProgressCacheSet(progressCache) + } + } + if progressCache == nil { + return errors.New("progress cache not found") + } + if finishedQuantity > progressCache.FinishedQuantity { //褰撴湁鍙樺寲鏃舵墠鏇存柊 + progressCache.FinishedQuantity = finishedQuantity + ProgressCacheSet(progressCache) + return model.NewProductionProgressSearch(nil).SetId(progressCache.ID).Save(progressCache) + } + return nil +} diff --git a/service/task.go b/service/task.go index 69cba48..4def207 100644 --- a/service/task.go +++ b/service/task.go @@ -25,6 +25,10 @@ // GetTask 鑾峰彇浠诲姟锛屾湭瀹屾垚鐨勫紑濮嬫椂闂存渶鏃╃殑浠诲姟 func (slf TaskService) GetTask() (taskData *response.TaskData, code int) { + if v, ok := TaskCacheGet(); ok { + return v, ecode.OK + } + nowTs := time.Now().Unix() var ( err error @@ -51,6 +55,7 @@ taskData = new(response.TaskData) taskData.Order = order taskData.Procedure = procedure + TaskCacheSet(taskData) return taskData, ecode.OK } @@ -63,6 +68,10 @@ } func (slf TaskService) UpdateProcedureStatus(db *gorm.DB, id int, status model.ProcedureStatus) error { + if status == model.ProcedureStatusFinished { + TaskCacheUnset() + ProgressCacheUnset() + } return model.NewProceduresSearch(db).SetId(id).UpdateByMap(map[string]interface{}{ "status": status, }) @@ -125,11 +134,11 @@ return } -// GetCurrentTask 鑾峰彇寰呭畬鎴愭垨杩涜涓殑浠诲姟锛� 姣忎釜宸ュ崟鍙湁涓�涓伐搴� -func (slf TaskService) GetCurrentTask() (workOrder *model.Order, err error) { +// GetNextTask 鑾峰彇鏈紑濮嬬殑浠诲姟 +func (slf TaskService) GetNextTask() (workOrder *model.Order, err error) { nowTs := time.Now().Unix() orderSearch := model.NewOrderSearch(nil) return orderSearch.SetOrder("start_time asc"). SetStartTimeMin(nowTs). - SetStatusList([]model.OrderStatus{model.OrderStatusWaitProcess, model.OrderStatusProcessing}).First() + SetStatus(model.OrderStatusWaitProcess).First() } -- Gitblit v1.8.0