zhangqian
2023-09-05 49e90e5de2e7166e74e26102dff9064b933fc5fd
工序的生产进度存表
3个文件已添加
9个文件已修改
503 ■■■■ 已修改文件
api/v1/task.go 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
crontask/cron_task.go 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
model/index.go 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
model/production_progress.go 257 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
model/work_order.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/msg_handler.go 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/nsq.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/cache_store.go 88 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/plc.go 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/progress.go 56 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/task.go 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/v1/task.go
@@ -35,7 +35,7 @@
        return
    }
    var resp response.TaskCountdown
    workOrder, err := service.NewTaskService().GetCurrentTask()
    workOrder, err := service.NewTaskService().GetNextTask()
    if err == nil {
        seconds := workOrder.StartTime - time.Now().Unix()
        resp.CountDownHour = seconds / 3600
@@ -180,12 +180,13 @@
        return
    }
    id := convertx.Atoi(idx)
    procedure, code := service.NewTaskService().GetProcedureById(id)
    taskService := service.NewTaskService()
    procedure, code := taskService.GetProcedureById(id)
    if code != ecode.OK {
        ctx.Fail(code)
        return
    }
    order, err := service.NewTaskService().GetOrderByWorkOrderId(procedure.WorkOrderID)
    order, err := taskService.GetOrderByWorkOrderId(procedure.WorkOrderID)
    if err != nil {
        ctx.Fail(ecode.UnknownErr)
        return
@@ -196,22 +197,22 @@
        return
    }
    processModel, err := service.NewTaskService().GetProcessParams(procedure, order)
    processModel, err := taskService.GetProcessParams(procedure, order)
    if err != nil || processModel == nil || processModel.ParamsMap == nil {
        ctx.Fail(ecode.UnknownErr)
        return
        //ctx.Fail(ecode.UnknownErr) //todo
        //return
    }
    err = model.WithTransaction(func(db *gorm.DB) error {
        err = service.NewTaskService().UpdateProcedureStatus(db, id, model.ProcedureStatusProcessing)
        err = taskService.UpdateProcedureStatus(db, id, model.ProcedureStatusProcessing)
        if err != nil {
            return err
        }
        err = service.NewTaskService().UpdateOrderStatus(db, order.ID, model.OrderStatusProcessing)
        err = taskService.UpdateOrderStatus(db, order.ID, model.OrderStatusProcessing)
        if err != nil {
            return err
        }
        return nil
        return service.NewProgressService().AddProgress(db, procedure, order)
    })
    if err != nil {
        logx.Errorf("SendProcessParams update order and procedure status error:%v", err.Error())
crontask/cron_task.go
@@ -7,6 +7,7 @@
    "apsClient/pkg/logx"
    "apsClient/service"
    "github.com/go-co-op/gocron"
    "github.com/spf13/cast"
    "time"
)
@@ -29,6 +30,7 @@
        finishNumber, err := service.PlcRead(plcConfig, constvar.PlcStartAddressTypeFinishNumber)
        if err == nil && finishNumber != 0 {
            service.PlcCacheSet(constvar.PlcCacheKeyFinishNumber, finishNumber)
            _ = service.NewProgressService().UpdateProgress(cast.ToInt64(finishNumber))
        }
        logx.Infof("plc read finish number:%v, err:%v", finishNumber, err)
main.go
@@ -7,6 +7,7 @@
    "apsClient/nsq"
    "apsClient/pkg/logx"
    "apsClient/router"
    "apsClient/service"
    "apsClient/service/plc_address"
    "fmt"
    "net/http"
@@ -37,6 +38,10 @@
    }
    //加载plc写入地址
    plc_address.LoadAddressFromFile()
    //提前加载任务
    service.NewTaskService().GetTask()
    go shutdown()
    logx.Infof("apsClient start serve...")
    server := &http.Server{
model/index.go
@@ -28,6 +28,7 @@
        PlcBrand{},
        DevicePlc{},
        ProcessModel{},
        ProductionProgress{},
    )
    return err
}
model/production_progress.go
New file
@@ -0,0 +1,257 @@
package model
import (
    "apsClient/pkg/sqlitex"
    "fmt"
    "gorm.io/gorm"
)
type (
    ProductionProgress struct {
        gorm.Model       `json:"-"`
        ID               int    `gorm:"primarykey"`
        WorkOrderID      string `gorm:"index;type:varchar(191);not null;comment:工单ID" json:"workOrderID"`
        OrderID          string `gorm:"index;type:varchar(191);not null;comment:订单ID" json:"orderID"`
        ProcedureID      string `gorm:"uniqueIndex:idx_product_procedure;type:varchar(191);comment:工序ID" json:"procedureId"`
        DeviceID         string `gorm:"type:varchar(191);not null;comment:设备ID" json:"deviceId"`
        FinishedQuantity int64  `gorm:"type:int;not null;comment:完成数量" json:"finishedQuantity"`
        TotalQuantity    int64  `gorm:"type:int;not null;comment:总量" json:"totalQuantity"`
    }
    ProductionProgressSearch struct {
        ProductionProgress
        Order    string
        PageNum  int
        PageSize int
        Orm      *gorm.DB
    }
)
func (slf *ProductionProgress) TableName() string {
    return "production_progress"
}
func NewProductionProgressSearch(db *gorm.DB) *ProductionProgressSearch {
    if db == nil {
        db = sqlitex.GetDB()
    }
    return &ProductionProgressSearch{Orm: db}
}
func (slf *ProductionProgressSearch) SetOrm(tx *gorm.DB) *ProductionProgressSearch {
    slf.Orm = tx
    return slf
}
func (slf *ProductionProgressSearch) SetPage(page, size int) *ProductionProgressSearch {
    slf.PageNum, slf.PageSize = page, size
    return slf
}
func (slf *ProductionProgressSearch) SetOrder(order string) *ProductionProgressSearch {
    slf.Order = order
    return slf
}
func (slf *ProductionProgressSearch) SetWorkOrderId(orderId string) *ProductionProgressSearch {
    slf.WorkOrderID = orderId
    return slf
}
func (slf *ProductionProgressSearch) SetProcedureId(procedureId string) *ProductionProgressSearch {
    slf.ProcedureID = procedureId
    return slf
}
func (slf *ProductionProgressSearch) SetDeviceId(id string) *ProductionProgressSearch {
    slf.DeviceID = id
    return slf
}
func (slf *ProductionProgressSearch) SetId(id int) *ProductionProgressSearch {
    slf.ID = id
    return slf
}
func (slf *ProductionProgressSearch) build() *gorm.DB {
    var db = slf.Orm.Model(&ProductionProgress{})
    if slf.Order != "" {
        db = db.Order(slf.Order)
    }
    if slf.ID != 0 {
        db = db.Where("id = ?", slf.ID)
    }
    if slf.WorkOrderID != "" {
        db = db.Where("work_order_id = ?", slf.WorkOrderID)
    }
    if slf.OrderID != "" {
        db = db.Where("order_id = ?", slf.OrderID)
    }
    if slf.ProcedureID != "" {
        db = db.Where("procedure_id = ?", slf.ProcedureID)
    }
    if slf.DeviceID != "" {
        db = db.Where("device_id = ?", slf.DeviceID)
    }
    return db
}
// Create 单条插入
func (slf *ProductionProgressSearch) Create(record *ProductionProgress) error {
    var db = slf.build()
    if err := db.Create(record).Error; err != nil {
        return fmt.Errorf("create err: %v, record: %+v", err, record)
    }
    return nil
}
// CreateBatch 批量插入
func (slf *ProductionProgressSearch) CreateBatch(records []*ProductionProgress) error {
    var db = slf.build()
    if err := db.Create(&records).Error; err != nil {
        return fmt.Errorf("create batch err: %v, records: %+v", err, records)
    }
    return nil
}
func (slf *ProductionProgressSearch) Save(record *ProductionProgress) error {
    var db = slf.build()
    if err := db.Save(record).Error; err != nil {
        return fmt.Errorf("save err: %v, record: %+v", err, record)
    }
    return nil
}
func (slf *ProductionProgressSearch) UpdateByMap(upMap map[string]interface{}) error {
    var (
        db = slf.build()
    )
    if err := db.Updates(upMap).Error; err != nil {
        return fmt.Errorf("update by map err: %v, upMap: %+v", err, upMap)
    }
    return nil
}
func (slf *ProductionProgressSearch) UpdateByQuery(query string, args []interface{}, upMap map[string]interface{}) error {
    var (
        db = slf.Orm.Table(slf.TableName()).Where(query, args...)
    )
    if err := db.Updates(upMap).Error; err != nil {
        return fmt.Errorf("update by query err: %v, query: %s, args: %+v, upMap: %+v", err, query, args, upMap)
    }
    return nil
}
func (slf *ProductionProgressSearch) Delete() error {
    var db = slf.build()
    if err := db.Unscoped().Delete(&ProductionProgress{}).Error; err != nil {
        return err
    }
    return nil
}
func (slf *ProductionProgressSearch) First() (*ProductionProgress, error) {
    var (
        record = new(ProductionProgress)
        db     = slf.build()
    )
    if err := db.First(record).Error; err != nil {
        return record, err
    }
    return record, nil
}
func (slf *ProductionProgressSearch) Find() ([]*ProductionProgress, int64, error) {
    var (
        records = make([]*ProductionProgress, 0)
        total   int64
        db      = slf.build()
    )
    if err := db.Count(&total).Error; err != nil {
        return records, total, fmt.Errorf("find count err: %v", err)
    }
    if slf.PageNum*slf.PageSize > 0 {
        db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize)
    }
    if err := db.Find(&records).Error; err != nil {
        return records, total, fmt.Errorf("find records err: %v", err)
    }
    return records, total, nil
}
func (slf *ProductionProgressSearch) FindNotTotal() ([]*ProductionProgress, error) {
    var (
        records = make([]*ProductionProgress, 0)
        db      = slf.build()
    )
    if slf.PageNum*slf.PageSize > 0 {
        db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize)
    }
    if err := db.Find(&records).Error; err != nil {
        return records, fmt.Errorf("find records err: %v", err)
    }
    return records, nil
}
// FindByQuery 指定条件查询.
func (slf *ProductionProgressSearch) FindByQuery(query string, args []interface{}) ([]*ProductionProgress, int64, error) {
    var (
        records = make([]*ProductionProgress, 0)
        total   int64
        db      = slf.Orm.Table(slf.TableName()).Where(query, args...)
    )
    if err := db.Count(&total).Error; err != nil {
        return records, total, fmt.Errorf("find by query count err: %v", err)
    }
    if slf.PageNum*slf.PageSize > 0 {
        db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize)
    }
    if err := db.Find(&records).Error; err != nil {
        return records, total, fmt.Errorf("find by query records err: %v, query: %s, args: %+v", err, query, args)
    }
    return records, total, nil
}
// FindByQueryNotTotal 指定条件查询&不查询总条数.
func (slf *ProductionProgressSearch) FindByQueryNotTotal(query string, args []interface{}) ([]*ProductionProgress, error) {
    var (
        records = make([]*ProductionProgress, 0)
        db      = slf.Orm.Table(slf.TableName()).Where(query, args...)
    )
    if slf.PageNum*slf.PageSize > 0 {
        db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize)
    }
    if err := db.Find(&records).Error; err != nil {
        return records, fmt.Errorf("find by query records err: %v, query: %s, args: %+v", err, query, args)
    }
    return records, nil
}
model/work_order.go
@@ -47,7 +47,7 @@
)
func (slf *Order) TableName() string {
    return "Order"
    return "work_order"
}
func NewOrderSearch(db *gorm.DB) *OrderSearch {
nsq/msg_handler.go
@@ -102,7 +102,15 @@
        logx.Errorf("ScheduleTask HandleMessage Unmarshal json err: %v", err.Error())
        return nil
    }
    if len(resp.KeyData) == 0 || len(resp.AddressData) == 0 || resp.DeviceId != conf.Conf.System.DeviceId {
    if resp.DeviceId != conf.Conf.System.DeviceId {
        return nil
    }
    //通知回复收到
    ReceivedMessageChan <- &ReceivedMessage{
        Topic:   slf.Topic,
        Message: data,
    }
    if len(resp.KeyData) == 0 || len(resp.AddressData) == 0 {
        return nil
    }
    //写入到文件
@@ -129,11 +137,6 @@
        address := cast.ToInt(strings.ReplaceAll(addresses[i], "\r", ""))
        plc_address.Set(key, address)
        logx.Infof("plc address set ok: key:%v, address:%v", key, address)
    }
    //通知回复收到
    ReceivedMessageChan <- &ReceivedMessage{
        Topic:   slf.Topic,
        Message: data,
    }
    return nil
}
@@ -179,7 +182,7 @@
        logx.Infof("unmarshal process params sync err :%s", err)
        return err
    }
    err = model.NewProcessModelSearch().Create(&processModel)
    err = model.NewProcessModelSearch().Save(&processModel)
    if err != nil {
        logx.Infof("save process params sync err :%s", err)
        return err
nsq/nsq.go
@@ -23,7 +23,7 @@
    safe.Go(func() {
        caller := NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId))
        var addressResult common.ResponsePlcAddress
        err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*2)
        err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*3)
        if err != nil {
            logx.Infof("get plc address err: %v", err.Error())
        }
service/cache_store.go
New file
@@ -0,0 +1,88 @@
package service
import (
    "apsClient/model"
    "apsClient/model/response"
    "fmt"
    "sync"
)
type CacheStore struct {
    cache map[string]interface{}
    mu    sync.Mutex
}
var defaultCacheStore *CacheStore
func init() {
    defaultCacheStore = newCacheManager()
}
func newCacheManager() *CacheStore {
    return &CacheStore{
        cache: make(map[string]interface{}),
    }
}
func (cm *CacheStore) Get(key string) (interface{}, bool) {
    cm.mu.Lock()
    defer cm.mu.Unlock()
    conn, ok := cm.cache[key]
    return conn, ok
}
func (cm *CacheStore) Add(key string, value interface{}) {
    cm.mu.Lock()
    defer cm.mu.Unlock()
    cm.cache[key] = value
}
func (cm *CacheStore) Remove(key string) {
    cm.mu.Lock()
    defer cm.mu.Unlock()
    delete(cm.cache, key)
}
const (
    PlcCacheKey             = "plc:%v"
    CurrentTaskCacheKey     = "current_task"
    CurrentProgressCacheKey = "current_progress"
)
func PlcCacheGet(key string) (interface{}, bool) {
    return defaultCacheStore.Get(fmt.Sprintf(PlcCacheKey, key))
}
func PlcCacheSet(key string, value interface{}) {
    defaultCacheStore.Add(fmt.Sprintf(PlcCacheKey, key), value)
}
func TaskCacheSet(value *response.TaskData) {
    defaultCacheStore.Add(CurrentTaskCacheKey, value)
}
func TaskCacheUnset() {
    defaultCacheStore.Remove(CurrentTaskCacheKey)
}
func TaskCacheGet() (*response.TaskData, bool) {
    if v, ok := defaultCacheStore.Get(CurrentTaskCacheKey); ok {
        return v.(*response.TaskData), ok
    }
    return nil, false
}
func ProgressCacheGet() (*model.ProductionProgress, bool) {
    if v, ok := defaultCacheStore.Get(CurrentProgressCacheKey); ok {
        return v.(*model.ProductionProgress), ok
    }
    return nil, false
}
func ProgressCacheSet(value *model.ProductionProgress) {
    defaultCacheStore.Add(CurrentProgressCacheKey, value)
}
func ProgressCacheUnset() {
    defaultCacheStore.Remove(CurrentProgressCacheKey)
}
service/plc.go
@@ -8,7 +8,6 @@
    "encoding/binary"
    "errors"
    "fmt"
    "sync"
)
func PlcRead(plcConfig *model.DevicePlc, fieldType constvar.PlcStartAddressType) (val interface{}, err error) {
@@ -85,42 +84,4 @@
    }
    logx.Infof("plc write ok, address: %v, value: %v, result: %v", startAddress, value, result)
    return
}
type CacheStore struct {
    cache map[string]interface{}
    mu    sync.Mutex
}
var defaultCacheStore *CacheStore
func init() {
    defaultCacheStore = newCacheManager()
}
func newCacheManager() *CacheStore {
    return &CacheStore{
        cache: make(map[string]interface{}),
    }
}
func (cm *CacheStore) Get(key string) (interface{}, bool) {
    cm.mu.Lock()
    defer cm.mu.Unlock()
    conn, ok := cm.cache[key]
    return conn, ok
}
func (cm *CacheStore) Add(key string, value interface{}) {
    cm.mu.Lock()
    defer cm.mu.Unlock()
    cm.cache[key] = value
}
func PlcCacheGet(key string) (interface{}, bool) {
    return defaultCacheStore.Get(key)
}
func PlcCacheSet(key string, value interface{}) {
    defaultCacheStore.Add(key, value)
}
service/progress.go
New file
@@ -0,0 +1,56 @@
package service
import (
    "apsClient/model"
    "errors"
    "gorm.io/gorm"
)
type ProgressService struct {
}
func NewProgressService() *ProgressService {
    return &ProgressService{}
}
func (slf ProgressService) AddProgress(db *gorm.DB, procedure *model.Procedures, order *model.Order) error {
    progress := &model.ProductionProgress{
        WorkOrderID:   procedure.WorkOrderID,
        OrderID:       procedure.OrderID,
        ProcedureID:   procedure.ProceduresInfo.ProcedureID,
        DeviceID:      procedure.DeviceID,
        TotalQuantity: order.Amount.IntPart(),
    }
    err := model.NewProductionProgressSearch(db).Create(progress)
    if err != nil {
        return err
    }
    ProgressCacheSet(progress)
    return nil
}
func (slf ProgressService) UpdateProgress(finishedQuantity int64) (err error) {
    var progressCache *model.ProductionProgress
    progressCache, ok := ProgressCacheGet()
    if !ok {
        progressCache, err = model.NewProductionProgressSearch(nil).SetOrder("id desc").First()
        if err == gorm.ErrRecordNotFound {
            return errors.New("progress cache not found")
        }
        if err != nil {
            return err
        }
        if progressCache.FinishedQuantity < progressCache.TotalQuantity {
            ProgressCacheSet(progressCache)
        }
    }
    if progressCache == nil {
        return errors.New("progress cache not found")
    }
    if finishedQuantity > progressCache.FinishedQuantity { //当有变化时才更新
        progressCache.FinishedQuantity = finishedQuantity
        ProgressCacheSet(progressCache)
        return model.NewProductionProgressSearch(nil).SetId(progressCache.ID).Save(progressCache)
    }
    return nil
}
service/task.go
@@ -25,6 +25,10 @@
// GetTask 获取任务,未完成的开始时间最早的任务
func (slf TaskService) GetTask() (taskData *response.TaskData, code int) {
    if v, ok := TaskCacheGet(); ok {
        return v, ecode.OK
    }
    nowTs := time.Now().Unix()
    var (
        err       error
@@ -51,6 +55,7 @@
    taskData = new(response.TaskData)
    taskData.Order = order
    taskData.Procedure = procedure
    TaskCacheSet(taskData)
    return taskData, ecode.OK
}
@@ -63,6 +68,10 @@
}
func (slf TaskService) UpdateProcedureStatus(db *gorm.DB, id int, status model.ProcedureStatus) error {
    if status == model.ProcedureStatusFinished {
        TaskCacheUnset()
        ProgressCacheUnset()
    }
    return model.NewProceduresSearch(db).SetId(id).UpdateByMap(map[string]interface{}{
        "status": status,
    })
@@ -125,11 +134,11 @@
    return
}
// GetCurrentTask 获取待完成或进行中的任务, 每个工单只有一个工序
func (slf TaskService) GetCurrentTask() (workOrder *model.Order, err error) {
// GetNextTask 获取未开始的任务
func (slf TaskService) GetNextTask() (workOrder *model.Order, err error) {
    nowTs := time.Now().Unix()
    orderSearch := model.NewOrderSearch(nil)
    return orderSearch.SetOrder("start_time asc").
        SetStartTimeMin(nowTs).
        SetStatusList([]model.OrderStatus{model.OrderStatusWaitProcess, model.OrderStatusProcessing}).First()
        SetStatus(model.OrderStatusWaitProcess).First()
}