From 49e90e5de2e7166e74e26102dff9064b933fc5fd Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期二, 05 九月 2023 16:00:27 +0800
Subject: [PATCH] 工序的生产进度存表
---
service/plc.go | 39 ----
service/task.go | 15 +
nsq/msg_handler.go | 17 +
service/cache_store.go | 88 +++++++++
service/progress.go | 56 ++++++
model/index.go | 1
model/production_progress.go | 257 ++++++++++++++++++++++++++++
main.go | 5
model/work_order.go | 2
crontask/cron_task.go | 2
api/v1/task.go | 19 +-
nsq/nsq.go | 2
12 files changed, 443 insertions(+), 60 deletions(-)
diff --git a/api/v1/task.go b/api/v1/task.go
index 1729bc7..cfd6ce1 100644
--- a/api/v1/task.go
+++ b/api/v1/task.go
@@ -35,7 +35,7 @@
return
}
var resp response.TaskCountdown
- workOrder, err := service.NewTaskService().GetCurrentTask()
+ workOrder, err := service.NewTaskService().GetNextTask()
if err == nil {
seconds := workOrder.StartTime - time.Now().Unix()
resp.CountDownHour = seconds / 3600
@@ -180,12 +180,13 @@
return
}
id := convertx.Atoi(idx)
- procedure, code := service.NewTaskService().GetProcedureById(id)
+ taskService := service.NewTaskService()
+ procedure, code := taskService.GetProcedureById(id)
if code != ecode.OK {
ctx.Fail(code)
return
}
- order, err := service.NewTaskService().GetOrderByWorkOrderId(procedure.WorkOrderID)
+ order, err := taskService.GetOrderByWorkOrderId(procedure.WorkOrderID)
if err != nil {
ctx.Fail(ecode.UnknownErr)
return
@@ -196,22 +197,22 @@
return
}
- processModel, err := service.NewTaskService().GetProcessParams(procedure, order)
+ processModel, err := taskService.GetProcessParams(procedure, order)
if err != nil || processModel == nil || processModel.ParamsMap == nil {
- ctx.Fail(ecode.UnknownErr)
- return
+ //ctx.Fail(ecode.UnknownErr) //todo
+ //return
}
err = model.WithTransaction(func(db *gorm.DB) error {
- err = service.NewTaskService().UpdateProcedureStatus(db, id, model.ProcedureStatusProcessing)
+ err = taskService.UpdateProcedureStatus(db, id, model.ProcedureStatusProcessing)
if err != nil {
return err
}
- err = service.NewTaskService().UpdateOrderStatus(db, order.ID, model.OrderStatusProcessing)
+ err = taskService.UpdateOrderStatus(db, order.ID, model.OrderStatusProcessing)
if err != nil {
return err
}
- return nil
+ return service.NewProgressService().AddProgress(db, procedure, order)
})
if err != nil {
logx.Errorf("SendProcessParams update order and procedure status error:%v", err.Error())
diff --git a/crontask/cron_task.go b/crontask/cron_task.go
index 0ad2210..35fde2b 100644
--- a/crontask/cron_task.go
+++ b/crontask/cron_task.go
@@ -7,6 +7,7 @@
"apsClient/pkg/logx"
"apsClient/service"
"github.com/go-co-op/gocron"
+ "github.com/spf13/cast"
"time"
)
@@ -29,6 +30,7 @@
finishNumber, err := service.PlcRead(plcConfig, constvar.PlcStartAddressTypeFinishNumber)
if err == nil && finishNumber != 0 {
service.PlcCacheSet(constvar.PlcCacheKeyFinishNumber, finishNumber)
+ _ = service.NewProgressService().UpdateProgress(cast.ToInt64(finishNumber))
}
logx.Infof("plc read finish number:%v, err:%v", finishNumber, err)
diff --git a/main.go b/main.go
index 927364e..386843c 100644
--- a/main.go
+++ b/main.go
@@ -7,6 +7,7 @@
"apsClient/nsq"
"apsClient/pkg/logx"
"apsClient/router"
+ "apsClient/service"
"apsClient/service/plc_address"
"fmt"
"net/http"
@@ -37,6 +38,10 @@
}
//鍔犺浇plc鍐欏叆鍦板潃
plc_address.LoadAddressFromFile()
+
+ //鎻愬墠鍔犺浇浠诲姟
+ service.NewTaskService().GetTask()
+
go shutdown()
logx.Infof("apsClient start serve...")
server := &http.Server{
diff --git a/model/index.go b/model/index.go
index 7828517..738e2fc 100644
--- a/model/index.go
+++ b/model/index.go
@@ -28,6 +28,7 @@
PlcBrand{},
DevicePlc{},
ProcessModel{},
+ ProductionProgress{},
)
return err
}
diff --git a/model/production_progress.go b/model/production_progress.go
new file mode 100644
index 0000000..8305442
--- /dev/null
+++ b/model/production_progress.go
@@ -0,0 +1,257 @@
+package model
+
+import (
+ "apsClient/pkg/sqlitex"
+ "fmt"
+ "gorm.io/gorm"
+)
+
+type (
+ ProductionProgress struct {
+ gorm.Model `json:"-"`
+ ID int `gorm:"primarykey"`
+ WorkOrderID string `gorm:"index;type:varchar(191);not null;comment:宸ュ崟ID" json:"workOrderID"`
+ OrderID string `gorm:"index;type:varchar(191);not null;comment:璁㈠崟ID" json:"orderID"`
+ ProcedureID string `gorm:"uniqueIndex:idx_product_procedure;type:varchar(191);comment:宸ュ簭ID" json:"procedureId"`
+ DeviceID string `gorm:"type:varchar(191);not null;comment:璁惧ID" json:"deviceId"`
+ FinishedQuantity int64 `gorm:"type:int;not null;comment:瀹屾垚鏁伴噺" json:"finishedQuantity"`
+ TotalQuantity int64 `gorm:"type:int;not null;comment:鎬婚噺" json:"totalQuantity"`
+ }
+
+ ProductionProgressSearch struct {
+ ProductionProgress
+ Order string
+ PageNum int
+ PageSize int
+ Orm *gorm.DB
+ }
+)
+
+func (slf *ProductionProgress) TableName() string {
+ return "production_progress"
+}
+
+func NewProductionProgressSearch(db *gorm.DB) *ProductionProgressSearch {
+ if db == nil {
+ db = sqlitex.GetDB()
+ }
+ return &ProductionProgressSearch{Orm: db}
+}
+
+func (slf *ProductionProgressSearch) SetOrm(tx *gorm.DB) *ProductionProgressSearch {
+ slf.Orm = tx
+ return slf
+}
+
+func (slf *ProductionProgressSearch) SetPage(page, size int) *ProductionProgressSearch {
+ slf.PageNum, slf.PageSize = page, size
+ return slf
+}
+
+func (slf *ProductionProgressSearch) SetOrder(order string) *ProductionProgressSearch {
+ slf.Order = order
+ return slf
+}
+
+func (slf *ProductionProgressSearch) SetWorkOrderId(orderId string) *ProductionProgressSearch {
+ slf.WorkOrderID = orderId
+ return slf
+}
+
+func (slf *ProductionProgressSearch) SetProcedureId(procedureId string) *ProductionProgressSearch {
+ slf.ProcedureID = procedureId
+ return slf
+}
+
+func (slf *ProductionProgressSearch) SetDeviceId(id string) *ProductionProgressSearch {
+ slf.DeviceID = id
+ return slf
+}
+
+func (slf *ProductionProgressSearch) SetId(id int) *ProductionProgressSearch {
+ slf.ID = id
+ return slf
+}
+
+func (slf *ProductionProgressSearch) build() *gorm.DB {
+ var db = slf.Orm.Model(&ProductionProgress{})
+
+ if slf.Order != "" {
+ db = db.Order(slf.Order)
+ }
+
+ if slf.ID != 0 {
+ db = db.Where("id = ?", slf.ID)
+ }
+
+ if slf.WorkOrderID != "" {
+ db = db.Where("work_order_id = ?", slf.WorkOrderID)
+ }
+
+ if slf.OrderID != "" {
+ db = db.Where("order_id = ?", slf.OrderID)
+ }
+
+ if slf.ProcedureID != "" {
+ db = db.Where("procedure_id = ?", slf.ProcedureID)
+ }
+
+ if slf.DeviceID != "" {
+ db = db.Where("device_id = ?", slf.DeviceID)
+ }
+
+ return db
+}
+
+// Create 鍗曟潯鎻掑叆
+func (slf *ProductionProgressSearch) Create(record *ProductionProgress) error {
+ var db = slf.build()
+
+ if err := db.Create(record).Error; err != nil {
+ return fmt.Errorf("create err: %v, record: %+v", err, record)
+ }
+
+ return nil
+}
+
+// CreateBatch 鎵归噺鎻掑叆
+func (slf *ProductionProgressSearch) CreateBatch(records []*ProductionProgress) error {
+ var db = slf.build()
+
+ if err := db.Create(&records).Error; err != nil {
+ return fmt.Errorf("create batch err: %v, records: %+v", err, records)
+ }
+
+ return nil
+}
+
+func (slf *ProductionProgressSearch) Save(record *ProductionProgress) error {
+ var db = slf.build()
+
+ if err := db.Save(record).Error; err != nil {
+ return fmt.Errorf("save err: %v, record: %+v", err, record)
+ }
+
+ return nil
+}
+
+func (slf *ProductionProgressSearch) UpdateByMap(upMap map[string]interface{}) error {
+ var (
+ db = slf.build()
+ )
+
+ if err := db.Updates(upMap).Error; err != nil {
+ return fmt.Errorf("update by map err: %v, upMap: %+v", err, upMap)
+ }
+
+ return nil
+}
+
+func (slf *ProductionProgressSearch) UpdateByQuery(query string, args []interface{}, upMap map[string]interface{}) error {
+ var (
+ db = slf.Orm.Table(slf.TableName()).Where(query, args...)
+ )
+
+ if err := db.Updates(upMap).Error; err != nil {
+ return fmt.Errorf("update by query err: %v, query: %s, args: %+v, upMap: %+v", err, query, args, upMap)
+ }
+
+ return nil
+}
+
+func (slf *ProductionProgressSearch) Delete() error {
+ var db = slf.build()
+
+ if err := db.Unscoped().Delete(&ProductionProgress{}).Error; err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (slf *ProductionProgressSearch) First() (*ProductionProgress, error) {
+ var (
+ record = new(ProductionProgress)
+ db = slf.build()
+ )
+
+ if err := db.First(record).Error; err != nil {
+ return record, err
+ }
+
+ return record, nil
+}
+
+func (slf *ProductionProgressSearch) Find() ([]*ProductionProgress, int64, error) {
+ var (
+ records = make([]*ProductionProgress, 0)
+ total int64
+ db = slf.build()
+ )
+
+ if err := db.Count(&total).Error; err != nil {
+ return records, total, fmt.Errorf("find count err: %v", err)
+ }
+ if slf.PageNum*slf.PageSize > 0 {
+ db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize)
+ }
+ if err := db.Find(&records).Error; err != nil {
+ return records, total, fmt.Errorf("find records err: %v", err)
+ }
+
+ return records, total, nil
+}
+
+func (slf *ProductionProgressSearch) FindNotTotal() ([]*ProductionProgress, error) {
+ var (
+ records = make([]*ProductionProgress, 0)
+ db = slf.build()
+ )
+
+ if slf.PageNum*slf.PageSize > 0 {
+ db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize)
+ }
+ if err := db.Find(&records).Error; err != nil {
+ return records, fmt.Errorf("find records err: %v", err)
+ }
+
+ return records, nil
+}
+
+// FindByQuery 鎸囧畾鏉′欢鏌ヨ.
+func (slf *ProductionProgressSearch) FindByQuery(query string, args []interface{}) ([]*ProductionProgress, int64, error) {
+ var (
+ records = make([]*ProductionProgress, 0)
+ total int64
+ db = slf.Orm.Table(slf.TableName()).Where(query, args...)
+ )
+
+ if err := db.Count(&total).Error; err != nil {
+ return records, total, fmt.Errorf("find by query count err: %v", err)
+ }
+ if slf.PageNum*slf.PageSize > 0 {
+ db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize)
+ }
+ if err := db.Find(&records).Error; err != nil {
+ return records, total, fmt.Errorf("find by query records err: %v, query: %s, args: %+v", err, query, args)
+ }
+
+ return records, total, nil
+}
+
+// FindByQueryNotTotal 鎸囧畾鏉′欢鏌ヨ&涓嶆煡璇㈡�绘潯鏁�.
+func (slf *ProductionProgressSearch) FindByQueryNotTotal(query string, args []interface{}) ([]*ProductionProgress, error) {
+ var (
+ records = make([]*ProductionProgress, 0)
+ db = slf.Orm.Table(slf.TableName()).Where(query, args...)
+ )
+
+ if slf.PageNum*slf.PageSize > 0 {
+ db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize)
+ }
+ if err := db.Find(&records).Error; err != nil {
+ return records, fmt.Errorf("find by query records err: %v, query: %s, args: %+v", err, query, args)
+ }
+
+ return records, nil
+}
diff --git a/model/work_order.go b/model/work_order.go
index b453d81..c6567b3 100644
--- a/model/work_order.go
+++ b/model/work_order.go
@@ -47,7 +47,7 @@
)
func (slf *Order) TableName() string {
- return "Order"
+ return "work_order"
}
func NewOrderSearch(db *gorm.DB) *OrderSearch {
diff --git a/nsq/msg_handler.go b/nsq/msg_handler.go
index 129d390..c5d5bf9 100644
--- a/nsq/msg_handler.go
+++ b/nsq/msg_handler.go
@@ -102,7 +102,15 @@
logx.Errorf("ScheduleTask HandleMessage Unmarshal json err: %v", err.Error())
return nil
}
- if len(resp.KeyData) == 0 || len(resp.AddressData) == 0 || resp.DeviceId != conf.Conf.System.DeviceId {
+ if resp.DeviceId != conf.Conf.System.DeviceId {
+ return nil
+ }
+ //閫氱煡鍥炲鏀跺埌
+ ReceivedMessageChan <- &ReceivedMessage{
+ Topic: slf.Topic,
+ Message: data,
+ }
+ if len(resp.KeyData) == 0 || len(resp.AddressData) == 0 {
return nil
}
//鍐欏叆鍒版枃浠�
@@ -129,11 +137,6 @@
address := cast.ToInt(strings.ReplaceAll(addresses[i], "\r", ""))
plc_address.Set(key, address)
logx.Infof("plc address set ok: key:%v, address:%v", key, address)
- }
- //閫氱煡鍥炲鏀跺埌
- ReceivedMessageChan <- &ReceivedMessage{
- Topic: slf.Topic,
- Message: data,
}
return nil
}
@@ -179,7 +182,7 @@
logx.Infof("unmarshal process params sync err :%s", err)
return err
}
- err = model.NewProcessModelSearch().Create(&processModel)
+ err = model.NewProcessModelSearch().Save(&processModel)
if err != nil {
logx.Infof("save process params sync err :%s", err)
return err
diff --git a/nsq/nsq.go b/nsq/nsq.go
index 79d792b..0bd6bf6 100644
--- a/nsq/nsq.go
+++ b/nsq/nsq.go
@@ -23,7 +23,7 @@
safe.Go(func() {
caller := NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId))
var addressResult common.ResponsePlcAddress
- err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*2)
+ err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*3)
if err != nil {
logx.Infof("get plc address err: %v", err.Error())
}
diff --git a/service/cache_store.go b/service/cache_store.go
new file mode 100644
index 0000000..4c3148b
--- /dev/null
+++ b/service/cache_store.go
@@ -0,0 +1,88 @@
+package service
+
+import (
+ "apsClient/model"
+ "apsClient/model/response"
+ "fmt"
+ "sync"
+)
+
+type CacheStore struct {
+ cache map[string]interface{}
+ mu sync.Mutex
+}
+
+var defaultCacheStore *CacheStore
+
+func init() {
+ defaultCacheStore = newCacheManager()
+}
+func newCacheManager() *CacheStore {
+ return &CacheStore{
+ cache: make(map[string]interface{}),
+ }
+}
+
+func (cm *CacheStore) Get(key string) (interface{}, bool) {
+ cm.mu.Lock()
+ defer cm.mu.Unlock()
+
+ conn, ok := cm.cache[key]
+ return conn, ok
+}
+
+func (cm *CacheStore) Add(key string, value interface{}) {
+ cm.mu.Lock()
+ defer cm.mu.Unlock()
+ cm.cache[key] = value
+}
+
+func (cm *CacheStore) Remove(key string) {
+ cm.mu.Lock()
+ defer cm.mu.Unlock()
+ delete(cm.cache, key)
+}
+
+const (
+ PlcCacheKey = "plc:%v"
+ CurrentTaskCacheKey = "current_task"
+ CurrentProgressCacheKey = "current_progress"
+)
+
+func PlcCacheGet(key string) (interface{}, bool) {
+ return defaultCacheStore.Get(fmt.Sprintf(PlcCacheKey, key))
+}
+
+func PlcCacheSet(key string, value interface{}) {
+ defaultCacheStore.Add(fmt.Sprintf(PlcCacheKey, key), value)
+}
+
+func TaskCacheSet(value *response.TaskData) {
+ defaultCacheStore.Add(CurrentTaskCacheKey, value)
+}
+
+func TaskCacheUnset() {
+ defaultCacheStore.Remove(CurrentTaskCacheKey)
+}
+
+func TaskCacheGet() (*response.TaskData, bool) {
+ if v, ok := defaultCacheStore.Get(CurrentTaskCacheKey); ok {
+ return v.(*response.TaskData), ok
+ }
+ return nil, false
+}
+
+func ProgressCacheGet() (*model.ProductionProgress, bool) {
+ if v, ok := defaultCacheStore.Get(CurrentProgressCacheKey); ok {
+ return v.(*model.ProductionProgress), ok
+ }
+ return nil, false
+}
+
+func ProgressCacheSet(value *model.ProductionProgress) {
+ defaultCacheStore.Add(CurrentProgressCacheKey, value)
+}
+
+func ProgressCacheUnset() {
+ defaultCacheStore.Remove(CurrentProgressCacheKey)
+}
diff --git a/service/plc.go b/service/plc.go
index 74ed5dc..090b4bc 100644
--- a/service/plc.go
+++ b/service/plc.go
@@ -8,7 +8,6 @@
"encoding/binary"
"errors"
"fmt"
- "sync"
)
func PlcRead(plcConfig *model.DevicePlc, fieldType constvar.PlcStartAddressType) (val interface{}, err error) {
@@ -85,42 +84,4 @@
}
logx.Infof("plc write ok, address: %v, value: %v, result: %v", startAddress, value, result)
return
-}
-
-type CacheStore struct {
- cache map[string]interface{}
- mu sync.Mutex
-}
-
-var defaultCacheStore *CacheStore
-
-func init() {
- defaultCacheStore = newCacheManager()
-}
-func newCacheManager() *CacheStore {
- return &CacheStore{
- cache: make(map[string]interface{}),
- }
-}
-
-func (cm *CacheStore) Get(key string) (interface{}, bool) {
- cm.mu.Lock()
- defer cm.mu.Unlock()
-
- conn, ok := cm.cache[key]
- return conn, ok
-}
-
-func (cm *CacheStore) Add(key string, value interface{}) {
- cm.mu.Lock()
- defer cm.mu.Unlock()
- cm.cache[key] = value
-}
-
-func PlcCacheGet(key string) (interface{}, bool) {
- return defaultCacheStore.Get(key)
-}
-
-func PlcCacheSet(key string, value interface{}) {
- defaultCacheStore.Add(key, value)
}
diff --git a/service/progress.go b/service/progress.go
new file mode 100644
index 0000000..bb0d859
--- /dev/null
+++ b/service/progress.go
@@ -0,0 +1,56 @@
+package service
+
+import (
+ "apsClient/model"
+ "errors"
+ "gorm.io/gorm"
+)
+
+type ProgressService struct {
+}
+
+func NewProgressService() *ProgressService {
+ return &ProgressService{}
+}
+
+func (slf ProgressService) AddProgress(db *gorm.DB, procedure *model.Procedures, order *model.Order) error {
+ progress := &model.ProductionProgress{
+ WorkOrderID: procedure.WorkOrderID,
+ OrderID: procedure.OrderID,
+ ProcedureID: procedure.ProceduresInfo.ProcedureID,
+ DeviceID: procedure.DeviceID,
+ TotalQuantity: order.Amount.IntPart(),
+ }
+ err := model.NewProductionProgressSearch(db).Create(progress)
+ if err != nil {
+ return err
+ }
+ ProgressCacheSet(progress)
+ return nil
+}
+
+func (slf ProgressService) UpdateProgress(finishedQuantity int64) (err error) {
+ var progressCache *model.ProductionProgress
+ progressCache, ok := ProgressCacheGet()
+ if !ok {
+ progressCache, err = model.NewProductionProgressSearch(nil).SetOrder("id desc").First()
+ if err == gorm.ErrRecordNotFound {
+ return errors.New("progress cache not found")
+ }
+ if err != nil {
+ return err
+ }
+ if progressCache.FinishedQuantity < progressCache.TotalQuantity {
+ ProgressCacheSet(progressCache)
+ }
+ }
+ if progressCache == nil {
+ return errors.New("progress cache not found")
+ }
+ if finishedQuantity > progressCache.FinishedQuantity { //褰撴湁鍙樺寲鏃舵墠鏇存柊
+ progressCache.FinishedQuantity = finishedQuantity
+ ProgressCacheSet(progressCache)
+ return model.NewProductionProgressSearch(nil).SetId(progressCache.ID).Save(progressCache)
+ }
+ return nil
+}
diff --git a/service/task.go b/service/task.go
index 69cba48..4def207 100644
--- a/service/task.go
+++ b/service/task.go
@@ -25,6 +25,10 @@
// GetTask 鑾峰彇浠诲姟锛屾湭瀹屾垚鐨勫紑濮嬫椂闂存渶鏃╃殑浠诲姟
func (slf TaskService) GetTask() (taskData *response.TaskData, code int) {
+ if v, ok := TaskCacheGet(); ok {
+ return v, ecode.OK
+ }
+
nowTs := time.Now().Unix()
var (
err error
@@ -51,6 +55,7 @@
taskData = new(response.TaskData)
taskData.Order = order
taskData.Procedure = procedure
+ TaskCacheSet(taskData)
return taskData, ecode.OK
}
@@ -63,6 +68,10 @@
}
func (slf TaskService) UpdateProcedureStatus(db *gorm.DB, id int, status model.ProcedureStatus) error {
+ if status == model.ProcedureStatusFinished {
+ TaskCacheUnset()
+ ProgressCacheUnset()
+ }
return model.NewProceduresSearch(db).SetId(id).UpdateByMap(map[string]interface{}{
"status": status,
})
@@ -125,11 +134,11 @@
return
}
-// GetCurrentTask 鑾峰彇寰呭畬鎴愭垨杩涜涓殑浠诲姟锛� 姣忎釜宸ュ崟鍙湁涓�涓伐搴�
-func (slf TaskService) GetCurrentTask() (workOrder *model.Order, err error) {
+// GetNextTask 鑾峰彇鏈紑濮嬬殑浠诲姟
+func (slf TaskService) GetNextTask() (workOrder *model.Order, err error) {
nowTs := time.Now().Unix()
orderSearch := model.NewOrderSearch(nil)
return orderSearch.SetOrder("start_time asc").
SetStartTimeMin(nowTs).
- SetStatusList([]model.OrderStatus{model.OrderStatusWaitProcess, model.OrderStatusProcessing}).First()
+ SetStatus(model.OrderStatusWaitProcess).First()
}
--
Gitblit v1.8.0