zhangqian
2023-10-13 f838162ed0ee7f2832924c2399eddd461760135a
plc地址和工艺参数不再单独拉取,只serf主启动时拉取
10个文件已修改
220 ■■■■■ 已修改文件
api/v1/task.go 41 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constvar/const.go 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
crontask/cron_task.go 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
model/common/common.go 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
model/process_model.go 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/consumer.go 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/msg_handler.go 63 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/nsq.go 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/task.go 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/v1/task.go
@@ -4,18 +4,14 @@
    "apsClient/conf"
    "apsClient/constvar"
    "apsClient/model"
    "apsClient/model/common"
    "apsClient/model/request"
    "apsClient/model/response"
    "apsClient/nsq"
    "apsClient/pkg/contextx"
    "apsClient/pkg/ecode"
    "apsClient/pkg/logx"
    "apsClient/pkg/safe"
    "apsClient/service"
    "apsClient/service/plc_address"
    "errors"
    "fmt"
    "github.com/gin-gonic/gin"
    "github.com/jinzhu/gorm"
    "github.com/spf13/cast"
@@ -179,15 +175,6 @@
        })
    }
    safe.Go(func() {
        caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId))
        var addressResult common.ResponsePlcAddress
        err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*3)
        if err != nil {
            logx.Infof("get plc address err: %v", err.Error())
        }
    })
    resp := response.ProcessParamsResponse{
        Number: processModel.Number,
        Params: processParamsArr,
@@ -231,20 +218,6 @@
        logx.Errorf("UpdateProcedureStatus err: %v", err.Error())
        ctx.Fail(ecode.UnknownErr)
        return
    }
    msg := &common.MsgTaskStatusUpdate{
        WorkOrderId:  procedure.WorkOrderID,
        ProcedureID:  procedure.ProceduresInfo.ProcedureID,
        DeviceId:     procedure.ProceduresInfo.DeviceID,
        IsProcessing: false,
        IsFinish:     true,
    }
    caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicTaskProcedureStatusUpdate, conf.Conf.NsqConf.NodeId), "")
    err = caller.Send(msg)
    if err != nil {
        logx.Errorf("send task status update msg error:%v", err.Error())
    }
    service.TaskFlagUnset(procedure.Channel)
@@ -331,20 +304,6 @@
    if err != nil {
        ctx.FailWithMsg(ecode.NeedConfirmedErr, "PLC请求失败,请检查PLC配置")
        return
    }
    msg := &common.MsgTaskStatusUpdate{
        WorkOrderId:  procedure.WorkOrderID,
        ProcedureID:  procedure.ProceduresInfo.ProcedureID,
        DeviceId:     procedure.ProceduresInfo.DeviceID,
        IsProcessing: true,
        IsFinish:     false,
    }
    caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicTaskProcedureStatusUpdate, conf.Conf.NsqConf.NodeId), "")
    err = caller.Send(msg)
    if err != nil {
        logx.Errorf("send task status update msg error:%v", err.Error())
    }
    service.TaskFlagSet(procedure.Channel)
    ctx.Ok()
constvar/const.go
@@ -10,6 +10,8 @@
    NsqTopicTaskProcedureStatusUpdate = "aps.%v.task.procedure.status"   //工序状态更新
    NsqTopicSyncTaskProgress          = "aps.%v.task.procedure.progress" //工序生产进度
    NsqTopicDeviceUpdate              = "aps.%v.device.update"           //设备信息更改
    NsqTopicPullDataRequest           = "aps.%v.pull.data.request"       //拉取数据请求
    NsqTopicPullDataResponse          = "aps.%v.pull.data.response"      //拉取数据响应
)
type PlcStartAddressType int
crontask/cron_task.go
@@ -3,6 +3,7 @@
import (
    "apsClient/conf"
    "apsClient/constvar"
    "apsClient/model/common"
    "apsClient/nsq"
    "apsClient/pkg/ecode"
    "apsClient/pkg/logx"
@@ -13,8 +14,9 @@
    "time"
)
func InitTask() error {
var s *gocron.Scheduler
func StartTask() error {
    finishNumberTimeInterval := conf.Conf.PLC.FinishNumberTimeInterval
    totalNumberTimeInterval := conf.Conf.PLC.TotalNumberTimeInterval
    if finishNumberTimeInterval == 0 {
@@ -23,7 +25,7 @@
    if totalNumberTimeInterval == 0 {
        totalNumberTimeInterval = 60
    }
    s := gocron.NewScheduler(time.UTC)
    s = gocron.NewScheduler(time.UTC)
    _, err := s.Every(finishNumberTimeInterval).Seconds().Do(func() {
        plcConfig, code := service.NewDevicePlcService().GetDevicePlc()
        if code != ecode.OK {
@@ -73,7 +75,8 @@
    })
    s.Every(180).Seconds().Do(SyncProductionProgress) //同步生产数据
    s.Every(60).Seconds().Do(SyncProductionProgress) //同步生产数据
    s.Every(60).Seconds().Do(SyncTaskStatus)         //同步任务状态
    s.StartAsync()
    return nil
}
@@ -104,5 +107,29 @@
            logx.Errorf("SyncProductionProgress error:%v", err.Error())
        }
    }
}
func SyncTaskStatus() {
    //todo
}
func StopTask() {
    if s != nil {
        s.Stop()
    }
}
// Once 一次性任务
func Once() {
    msg := &common.MsgPullDataRequest{DataType: common.PullDataTypeProcessModel}
    caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse)
    err := caller.Send(msg)
    if err != nil {
        logx.Errorf("send pull data msg error:%v", err.Error())
    }
    caller = nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), "")
    err = caller.Send(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId})
    if err != nil {
        logx.Infof("get plc address err: %v", err.Error())
    }
}
main.go
@@ -25,11 +25,6 @@
        return
    }
    if err := crontask.InitTask(); err != nil {
        logx.Errorf("crontab task Init err:%v", err)
        return
    }
    //加载plc写入地址
    plc_address.LoadAddressFromFile()
@@ -54,6 +49,11 @@
            logx.Errorf("nsq Init err:%v", err)
            return
        }
        crontask.Once()
        if err := crontask.StartTask(); err != nil {
            logx.Errorf("crontab task Init err:%v", err)
            return
        }
    }
    logx.Infof("apsClient start serve...")
@@ -73,8 +73,14 @@
            logx.Errorf("nsq Init err:%v", err)
            return
        }
        crontask.Once()
        if err := crontask.StartTask(); err != nil {
            logx.Errorf("crontab task Init err:%v", err)
            return
        }
    case serf.EventJoinCluster, serf.EventMaster2Slave:
        nsq.Stop()
        crontask.StopTask()
    }
    logx.Infof("serf cluster event: %v", stat)
model/common/common.go
@@ -137,3 +137,19 @@
    ProcedureName string `gorm:"type:varchar(191);comment:工序名称" json:"procedureName"`
    DeviceID      string `gorm:"index;type:varchar(191);not null;comment:设备ID" json:"deviceId"`
}
type PullDataType string
const (
    PullDataTypeProcessModel = "process_model"
)
// MsgPullDataRequest 拉取云端数据
type MsgPullDataRequest struct {
    DataType PullDataType `json:"dataType"` //要拉取的数据类型
}
type MsgPullDataResponse struct {
    DataType PullDataType `json:"dataType"` //要拉取的数据类型
    Data     interface{}  //返回的数据
}
model/process_model.go
@@ -32,6 +32,7 @@
        Orm           *gorm.DB
        Procedures    []string
        CurrentNumber string
        Numbers       []string
    }
)
@@ -74,6 +75,11 @@
func (slf *ProcessModelSearch) SetNumber(number string) *ProcessModelSearch {
    slf.Number = number
    return slf
}
func (slf *ProcessModelSearch) SetNumbers(numbers []string) *ProcessModelSearch {
    slf.Numbers = numbers
    return slf
}
@@ -137,6 +143,10 @@
        db = db.Where("number != ?", slf.CurrentNumber)
    }
    if len(slf.Numbers) != 0 {
        db = db.Where("number in ?", slf.Numbers)
    }
    return db
}
nsq/consumer.go
@@ -27,6 +27,8 @@
        handler = &ProcessParamsSync{Topic: topic}
    case fmt.Sprintf(constvar.NsqTopicDeviceUpdate, conf.Conf.NsqConf.NodeId):
        handler = &DeviceUpdate{Topic: topic}
    case fmt.Sprintf(constvar.NsqTopicPullDataResponse, conf.Conf.NsqConf.NodeId):
        handler = &PullDataResponse{Topic: topic}
    }
    c.AddHandler(handler.HandleMessage)
nsq/msg_handler.go
@@ -275,3 +275,66 @@
    return nil
}
type PullDataResponse struct {
    Topic string
}
func (slf *PullDataResponse) HandleMessage(data []byte) (err error) {
    logx.Infof("get a pull data response message :%s", data)
    var pullDataResponse common.MsgPullDataResponse
    err = json.Unmarshal(data, &pullDataResponse)
    if err != nil {
        logx.Infof("unmarshal msg err :%s", err)
        return err
    }
    switch pullDataResponse.DataType {
    case common.PullDataTypeProcessModel:
        err = slf.DealProcessModelData(pullDataResponse.Data)
    }
    if err != nil {
        logx.Infof("process pull data  err :%s", err)
        return err
    }
    return nil
}
func (slf *PullDataResponse) DealProcessModelData(data interface{}) error {
    var processModels []*model.ProcessModel
    err := mapstructure.Decode(data, &processModels)
    if err != nil {
        return err
    }
    numbers := make([]string, 0, len(processModels))
    for _, processModel := range processModels {
        numbers = append(numbers, processModel.Number)
    }
    existsProcessModels, err := model.NewProcessModelSearch().SetIsNew(true).SetNumbers(numbers).FindNotTotal()
    if err != nil {
        return err
    }
    existsProcessModelsMap := make(map[string]struct{}, len(existsProcessModels))
    for _, processModel := range existsProcessModels {
        existsProcessModelsMap[processModel.Number] = struct{}{}
    }
    for _, processModel := range processModels {
        if _, exists := existsProcessModelsMap[processModel.Number]; exists {
            continue
        }
        err = model.WithTransaction(func(db *gorm.DB) error {
            err = model.NewProcessModelSearch().SetOrm(db).SetProcedure(processModel.Procedure).SetProduct(processModel.Product).SetIsNew(true).UpdateByMap(map[string]interface{}{"is_new": 0})
            if err != nil {
                return err
            }
            processModel.IsNew = true
            return model.NewProcessModelSearch().SetOrm(db).Create(processModel)
        })
        if err != nil {
            return err
        }
    }
    return nil
}
nsq/nsq.go
@@ -3,7 +3,6 @@
import (
    "apsClient/conf"
    "apsClient/constvar"
    "apsClient/model/common"
    "apsClient/pkg/logx"
    "apsClient/pkg/safe"
    "basic.com/aps/nsqclient.git"
@@ -11,7 +10,6 @@
    "errors"
    "fmt"
    "sync"
    "time"
)
type consumerManager struct {
@@ -32,14 +30,6 @@
    if err := initProducer(); err != nil {
        return err
    }
    safe.Go(func() {
        caller := NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId))
        var addressResult common.ResponsePlcAddress
        err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*3)
        if err != nil {
            logx.Infof("get plc address err: %v", err.Error())
        }
    })
    var topics = []string{
        constvar.NsqTopicScheduleTask,
@@ -47,6 +37,7 @@
        constvar.NsqTopicProcessParamsResponse,
        constvar.NsqTopicApsProcessParams,
        constvar.NsqTopicDeviceUpdate,
        constvar.NsqTopicPullDataResponse,
    }
    for _, t := range topics {
        topic := fmt.Sprintf(t, conf.Conf.NsqConf.NodeId)
service/task.go
@@ -6,10 +6,8 @@
    "apsClient/model"
    "apsClient/model/common"
    "apsClient/model/response"
    "apsClient/nsq"
    "apsClient/pkg/ecode"
    "apsClient/pkg/logx"
    "apsClient/pkg/structx"
    "fmt"
    "github.com/jinzhu/gorm"
    "time"
@@ -176,30 +174,6 @@
    }
    if err == nil {
        return data, nil
    }
    if err == gorm.ErrRecordNotFound { //如果数据库没有从云端获取
        caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicProcessParamsRequest, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicProcessParamsResponse, conf.Conf.NsqConf.NodeId))
        var result common.ResponseProcessParams
        err = caller.Call(common.RequestProcessParams{
            WorkOrder: procedure.WorkOrderID,
            OrderId:   procedure.OrderID,
            Product:   order.ProductName,
            Procedure: procedure.ProceduresInfo.ProcedureName,
            Device:    procedure.ProceduresInfo.DeviceName,
            DeviceId:  conf.Conf.System.DeviceId,
        }, &result, time.Second*3)
        if err != nil {
            logx.Errorf("TaskStart GetProcessModel error:%v", err.Error())
            return
        }
        if result.ParamsMap == nil {
            logx.Errorf("TaskStart GetProcessModel response miss process params:%v", result)
            return
        }
        processModel = new(model.ProcessModel)
        err = structx.AssignTo(result, &processModel)
        return processModel, err
    }
    return
}