zhangqian
2023-09-14 040cd381f2e8475e9b4eb336b704ad878f56f4fa
适应性改动
9个文件已修改
176 ■■■■ 已修改文件
api/v1/plc.go 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/v1/task.go 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
crontask/cron_task.go 59 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
model/production_progress.go 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/msg_handler.go 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/cache_store.go 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/plc.go 47 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/progress.go 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/task.go 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/v1/plc.go
@@ -29,8 +29,8 @@
    if !ok {
        return
    }
    finishNumber, _ := service.PlcCacheGet(constvar.PlcCacheKeyFinishNumber)
    totalNumber, _ := service.PlcCacheGet(constvar.PlcCacheKeyTotalNumber)
    finishNumber, _ := service.PlcCacheGet(params.Position, constvar.PlcCacheKeyFinishNumber)
    totalNumber, _ := service.PlcCacheGet(params.Position, constvar.PlcCacheKeyTotalNumber)
    resp := new(response.ProductProgress)
    resp.FinishNumber = cast.ToInt(finishNumber)
    resp.TotalNumber = cast.ToInt(totalNumber)
api/v1/task.go
@@ -146,7 +146,7 @@
        ctx.Fail(code)
        return
    }
    err := service.NewTaskService().UpdateProcedureStatus(nil, id, model.ProcedureStatusFinished)
    err := service.NewTaskService().UpdateProcedureStatus(nil, id, model.ProcedureStatusFinished, procedure.Position)
    if err != nil {
        logx.Errorf("UpdateProcedureStatus err: %v", err.Error())
        ctx.Fail(ecode.UnknownErr)
@@ -211,6 +211,7 @@
        if err != nil {
            return err
        }
        procedure.Position = params.Position
        err = taskService.UpdateOrderStatus(db, order.ID, model.OrderStatusProcessing)
        if err != nil {
            return err
@@ -239,7 +240,7 @@
        return
    }
    plcConfig.CurrentTryTimes = 0
    err = service.PlcWrite(plcConfig, constvar.PlcStartAddressTypeTotalNumber, order.Amount.IntPart())
    err = service.PlcWrite(plcConfig, constvar.PlcStartAddressTypeTotalNumber, params.Position, order.Amount.IntPart())
    if err != nil {
        ctx.FailWithMsg(ecode.NeedConfirmedErr, "糟糕,工艺下发失败。")
        return
crontask/cron_task.go
@@ -29,12 +29,16 @@
        if code != ecode.OK {
            return
        }
        finishNumber, err := service.PlcRead(plcConfig, constvar.PlcStartAddressTypeFinishNumber)
        if err == nil && finishNumber != 0 {
            service.PlcCacheSet(constvar.PlcCacheKeyFinishNumber, finishNumber)
            _ = service.NewProgressService().UpdateProgress(cast.ToInt64(finishNumber))
        for _, addressItem := range plcConfig.Details {
            if addressItem.FieldName == constvar.PlcStartAddressTypeFinishNumber {
                finishNumber, err := service.PlcReadDirect(plcConfig, addressItem.StartAddress, addressItem.Position)
                if err == nil && finishNumber != 0 {
                    service.PlcCacheSet(addressItem.Position, constvar.PlcCacheKeyFinishNumber, finishNumber)
                    _ = service.NewProgressService().UpdateProgress(addressItem.Position, cast.ToInt64(finishNumber))
                }
                logx.Infof("plc read finish number:%v, err:%v", finishNumber, err)
            }
        }
        logx.Infof("plc read finish number:%v, err:%v", finishNumber, err)
    })
    if err != nil {
@@ -46,11 +50,17 @@
        if code != ecode.OK {
            return
        }
        totalNumber, err := service.PlcRead(plcConfig, constvar.PlcStartAddressTypeTotalNumber)
        if err == nil && totalNumber != 0 {
            service.PlcCacheSet(constvar.PlcCacheKeyTotalNumber, totalNumber)
        for _, addressItem := range plcConfig.Details {
            if addressItem.FieldName == constvar.PlcStartAddressTypeTotalNumber {
                totalNumber, err := service.PlcReadDirect(plcConfig, addressItem.StartAddress, addressItem.Position)
                if err == nil && totalNumber != 0 {
                    service.PlcCacheSet(addressItem.Position, constvar.PlcCacheKeyTotalNumber, totalNumber)
                    _ = service.NewProgressService().UpdateProgress(addressItem.Position, cast.ToInt64(totalNumber))
                }
                logx.Infof("plc read total number:%v, err:%v", totalNumber, err)
            }
        }
        logx.Infof("plc read total number:%v, err:%v", totalNumber, err)
    })
    s.Every(60).Seconds().StartImmediately().Do(SyncProductionProgress) //同步生产数据
@@ -59,17 +69,30 @@
}
func SyncProductionProgress() {
    progress, err := service.NewProgressService().GetCurrentProgress()
    if err != nil {
    plcConfig, code := service.NewDevicePlcService().GetDevicePlc()
    if code != ecode.OK {
        return
    }
    if progress == nil {
        return
    var positions []int
    for _, item := range plcConfig.Details {
        if item.FieldName == constvar.PlcStartAddressTypeFinishNumber {
            positions = append(positions, item.Position)
        }
    }
    for _, position := range positions {
        progress, err := service.NewProgressService().GetCurrentProgress(position)
        if err != nil {
            return
        }
        if progress == nil {
            return
        }
        caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicSyncTaskProgress, conf.Conf.NsqConf.NodeId), "")
        err = caller.Send(progress)
        if err != nil {
            logx.Errorf("SyncProductionProgress error:%v", err.Error())
        }
    }
    caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicSyncTaskProgress, conf.Conf.NsqConf.NodeId), "")
    err = caller.Send(progress)
    if err != nil {
        logx.Errorf("SyncProductionProgress error:%v", err.Error())
    }
}
model/production_progress.go
@@ -15,6 +15,7 @@
        ProcedureID      string `gorm:"uniqueIndex:idx_product_procedure;type:varchar(191);comment:工序ID" json:"procedureId"`
        DeviceID         string `gorm:"type:varchar(191);not null;comment:设备ID" json:"deviceId"`
        FinishedQuantity int64  `gorm:"type:int;not null;comment:完成数量" json:"finishedQuantity"`
        Position         int    `gorm:"type:int;comment:工作位置" json:"position"` //每个设备可能有多个机位同时生产,用position表示位置
        TotalQuantity    int64  `gorm:"type:int;not null;comment:总量" json:"totalQuantity"`
    }
@@ -73,6 +74,11 @@
    return slf
}
func (slf *ProductionProgressSearch) SetPosition(position int) *ProductionProgressSearch {
    slf.Position = position
    return slf
}
func (slf *ProductionProgressSearch) build() *gorm.DB {
    var db = slf.Orm.Model(&ProductionProgress{})
@@ -100,6 +106,10 @@
        db = db.Where("device_id = ?", slf.DeviceID)
    }
    if slf.Position != 0 {
        db = db.Where("position = ?", slf.Position)
    }
    return db
}
nsq/msg_handler.go
@@ -152,6 +152,10 @@
        }
    }
    if resp.PlcConfig.Method == "" {
        return nil
    }
    var record model.DevicePlc
    err = mapstructure.Decode(resp.PlcConfig, &record)
    if err != nil {
service/cache_store.go
@@ -44,17 +44,17 @@
}
const (
    PlcCacheKey             = "plc:%v"
    PlcCacheKey             = "plc:%v:%v"
    CurrentTaskCacheKey     = "current_task"
    CurrentProgressCacheKey = "current_progress"
    CurrentProgressCacheKey = "current_progress:%v"
)
func PlcCacheGet(key string) (interface{}, bool) {
    return defaultCacheStore.Get(fmt.Sprintf(PlcCacheKey, key))
func PlcCacheGet(position int, key string) (interface{}, bool) {
    return defaultCacheStore.Get(fmt.Sprintf(PlcCacheKey, position, key))
}
func PlcCacheSet(key string, value interface{}) {
    defaultCacheStore.Add(fmt.Sprintf(PlcCacheKey, key), value)
func PlcCacheSet(position int, key string, value interface{}) {
    defaultCacheStore.Add(fmt.Sprintf(PlcCacheKey, position, key), value)
}
func TaskCacheSet(value *response.TaskData) {
@@ -72,17 +72,17 @@
    return nil, false
}
func ProgressCacheGet() (*model.ProductionProgress, bool) {
    if v, ok := defaultCacheStore.Get(CurrentProgressCacheKey); ok {
func ProgressCacheGet(position int) (*model.ProductionProgress, bool) {
    if v, ok := defaultCacheStore.Get(fmt.Sprintf(CurrentProgressCacheKey, position)); ok {
        return v.(*model.ProductionProgress), ok
    }
    return nil, false
}
func ProgressCacheSet(value *model.ProductionProgress) {
    defaultCacheStore.Add(CurrentProgressCacheKey, value)
func ProgressCacheSet(position int, value *model.ProductionProgress) {
    defaultCacheStore.Add(fmt.Sprintf(CurrentProgressCacheKey, position), value)
}
func ProgressCacheUnset() {
    defaultCacheStore.Remove(CurrentProgressCacheKey)
func ProgressCacheUnset(position int) {
    defaultCacheStore.Remove(fmt.Sprintf(CurrentProgressCacheKey, position))
}
service/plc.go
@@ -13,7 +13,7 @@
    "github.com/spf13/cast"
)
func PlcRead(plcConfig *model.DevicePlc, fieldType constvar.PlcStartAddressType) (val interface{}, err error) {
func PlcRead(plcConfig *model.DevicePlc, fieldType constvar.PlcStartAddressType, position int) (val interface{}, err error) {
    var (
        startAddress int
        valueType    constvar.PlcStartAddressValueType
@@ -22,7 +22,7 @@
    )
    for _, pc := range plcConfig.Details {
        if pc.FieldName == fieldType {
        if pc.FieldName == fieldType && pc.Position == position {
            startAddress = pc.StartAddress
            valueType = pc.Type
            dataLength = pc.Length
@@ -66,7 +66,7 @@
    return nil, errors.New("interface type not support")
}
func PlcWrite(plcConfig *model.DevicePlc, fieldType constvar.PlcStartAddressType, value interface{}) (err error) {
func PlcWrite(plcConfig *model.DevicePlc, fieldType constvar.PlcStartAddressType, position int, value interface{}) (err error) {
    var (
        startAddress int
        ipAddr       string
@@ -78,7 +78,7 @@
    plcConfig.CurrentTryTimes++
    for _, pc := range plcConfig.Details {
        if pc.FieldName == fieldType {
        if pc.FieldName == fieldType && pc.Position == position {
            startAddress = pc.StartAddress
        }
    }
@@ -89,14 +89,14 @@
        if err != nil {
            logx.Errorf("plc write failed, 连接plc失败: %v", err.Error())
            plcConfig.CurrentErr = err
            return PlcWrite(plcConfig, fieldType, value)
            return PlcWrite(plcConfig, fieldType, position, value)
        }
        result, err := plc.WriteHoldingRegister(conn, startAddress, value)
        if err != nil {
            logx.Errorf("plc write failed, address: %v, value: %v, err: %v", startAddress, value, err.Error())
            plcConfig.CurrentErr = err
            return PlcWrite(plcConfig, fieldType, value)
            return PlcWrite(plcConfig, fieldType, position, value)
        }
        logx.Infof("plc write ok, address: %v, value: %v, result: %v", startAddress, value, result)
    } else if plcConfig.Method == constvar.PlcMethodSerial {
@@ -144,3 +144,38 @@
    }
    return
}
func PlcReadDirect(plcConfig *model.DevicePlc, address int, dataLength int) (val interface{}, err error) {
    var (
        ipAddr string
    )
    if plcConfig.CurrentTryTimes > plcConfig.MaxTryTimes {
        return nil, plcConfig.CurrentErr
    }
    plcConfig.CurrentTryTimes++
    if plcConfig.Method == constvar.PlcMethodModbusTCP {
        ipAddr = fmt.Sprintf("%s:%v", plcConfig.Address, plcConfig.Port)
        conn, err := plc.GetModbusConnection(ipAddr)
        if err != nil {
            logx.Errorf("plc write failed, 连接plc失败: %v", err.Error())
            plcConfig.CurrentErr = err
            return PlcReadDirect(plcConfig, address, dataLength)
        }
        value, err := plc.ReadHoldingRegister(conn, address, dataLength)
        if err != nil {
            logx.Errorf("plc read failed, address: %v, err: %v", address, err.Error())
            plcConfig.CurrentErr = err
            return PlcReadDirect(plcConfig, address, dataLength)
        }
        logx.Infof("plc read ok, address: %v, result: %v", address, value)
    } else if plcConfig.Method == constvar.PlcMethodSerial {
        ipAddr = conf.Conf.Services.Serial
        if ipAddr == "" {
            return nil, errors.New("conf.Conf.Services.Serial config not set yet")
        }
        label := fmt.Sprintf("D%d", address)
        return plccom.ReadPLC(plccom.DeviceTypeMitsubishi, ipAddr, label)
    }
    return
}
service/progress.go
@@ -20,17 +20,18 @@
        ProcedureID:   procedure.ProceduresInfo.ProcedureID,
        DeviceID:      procedure.DeviceID,
        TotalQuantity: order.Amount.IntPart(),
        Position:      procedure.Position,
    }
    err := model.NewProductionProgressSearch(db).Create(progress)
    if err != nil {
        return err
    }
    ProgressCacheSet(progress)
    ProgressCacheSet(procedure.Position, progress)
    return nil
}
func (slf ProgressService) UpdateProgress(finishedQuantity int64) (err error) {
    progressCache, err := slf.GetCurrentProgress()
func (slf ProgressService) UpdateProgress(position int, finishedQuantity int64) (err error) {
    progressCache, err := slf.GetCurrentProgress(position)
    if err != nil {
        return err
    }
@@ -39,17 +40,17 @@
    }
    if finishedQuantity > progressCache.FinishedQuantity { //当有变化时才更新
        progressCache.FinishedQuantity = finishedQuantity
        ProgressCacheSet(progressCache)
        ProgressCacheSet(position, progressCache)
        return model.NewProductionProgressSearch(nil).SetId(progressCache.ID).Save(progressCache)
    }
    return nil
}
func (slf ProgressService) GetCurrentProgress() (progressCache *model.ProductionProgress, err error) {
func (slf ProgressService) GetCurrentProgress(position int) (progressCache *model.ProductionProgress, err error) {
    var ok bool
    progressCache, ok = ProgressCacheGet()
    progressCache, ok = ProgressCacheGet(position)
    if !ok {
        progressCache, err = model.NewProductionProgressSearch(nil).SetOrder("id desc").First()
        progressCache, err = model.NewProductionProgressSearch(nil).SetPosition(position).SetOrder("id desc").First()
        if err == gorm.ErrRecordNotFound {
            return nil, errors.New("progress not found")
        }
@@ -61,7 +62,7 @@
            progressCache = nil
        }
        if progressCache != nil {
            ProgressCacheSet(progressCache)
            ProgressCacheSet(position, progressCache)
        }
    }
    return
service/task.go
@@ -116,7 +116,7 @@
func (slf TaskService) UpdateProcedureStatusAndPosition(db *gorm.DB, id int, status model.ProcedureStatus, position int) error {
    if status == model.ProcedureStatusFinished {
        ProgressCacheUnset()
        ProgressCacheUnset(position)
    }
    return model.NewProceduresSearch(db).SetId(id).UpdateByMap(map[string]interface{}{
        "status":   status,
@@ -124,9 +124,9 @@
    })
}
func (slf TaskService) UpdateProcedureStatus(db *gorm.DB, id int, status model.ProcedureStatus) error {
func (slf TaskService) UpdateProcedureStatus(db *gorm.DB, id int, status model.ProcedureStatus, position int) error {
    if status == model.ProcedureStatusFinished {
        ProgressCacheUnset()
        ProgressCacheUnset(position)
    }
    return model.NewProceduresSearch(db).SetId(id).UpdateByMap(map[string]interface{}{
        "status": status,