zhangqian
2023-10-20 bc7ee359dfe4e66a05c2cca9deb7a945534009f3
删除不再用的topic,增加初始化的时候,设备拉取
7个文件已修改
76 ■■■■ 已修改文件
constvar/const.go 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
crontask/cron_task.go 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
model/common/common.go 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
model/device.go 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/consumer.go 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/msg_handler.go 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/nsq.go 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constvar/const.go
@@ -1,11 +1,8 @@
package constvar
const (
    NsqTopicScheduleTask              = "aps.%v.scheduleTask" //排程任务下发
    NsqTopicGetPlcAddress             = "aps.%v.getPlcAddress"
    NsqTopicSendPlcAddress            = "aps.%v.sendPlcAddress"
    NsqTopicProcessParamsRequest      = "aps.%v.processParams.request"
    NsqTopicProcessParamsResponse     = "aps.%v.processParams.response"
    NsqTopicScheduleTask              = "aps.%v.scheduleTask"            //排程任务下发
    NsqTopicSendPlcAddress            = "aps.%v.sendPlcAddress"          //plc address更新
    NsqTopicApsProcessParams          = "aps.%v.aps.processParams"       //有了新的工艺模型
    NsqTopicTaskProcedureStatusUpdate = "aps.%v.task.procedure.status"   //工序状态更新
    NsqTopicSyncTaskProgress          = "aps.%v.task.procedure.progress" //工序生产进度
crontask/cron_task.go
@@ -143,11 +143,12 @@
    caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse)
    err := caller.Send(msg)
    if err != nil {
        logx.Errorf("send pull data msg error:%v", err.Error())
        logx.Errorf("send pull data msg error:%v, msg:%+v", err.Error(), msg)
    }
    caller = nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), "")
    err = caller.Send(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId})
    msg = &common.MsgPullDataRequest{DataType: common.PullDataTypeDevice}
    caller = nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse)
    err = caller.Send(msg)
    if err != nil {
        logx.Infof("get plc address err: %v", err.Error())
        logx.Errorf("send pull data msg error:%v, msg:%+v", err.Error(), msg)
    }
}
model/common/common.go
@@ -139,6 +139,7 @@
const (
    PullDataTypeProcessModel = "process_model"
    PullDataTypeDevice       = "device"
)
// MsgPullDataRequest 拉取云端数据
model/device.go
@@ -19,10 +19,11 @@
    DeviceSearch struct {
        Device
        Order    string
        PageNum  int
        PageSize int
        Orm      *gorm.DB
        Order     string
        PageNum   int
        PageSize  int
        Orm       *gorm.DB
        DeviceIDs []string
    }
)
@@ -64,6 +65,11 @@
    return slf
}
func (slf *DeviceSearch) SetDeviceIds(deviceIds []string) *DeviceSearch {
    slf.DeviceIDs = deviceIds
    return slf
}
func (slf *DeviceSearch) build() *gorm.DB {
    var db = slf.Orm.Table(slf.TableName())
@@ -75,6 +81,10 @@
        db = db.Where("device_id = ?", slf.DeviceID)
    }
    if len(slf.DeviceIDs) != 0 {
        db = db.Where("device_id in (?)", slf.DeviceIDs)
    }
    if slf.Order != "" {
        db = db.Order(slf.Order)
    }
nsq/consumer.go
@@ -21,8 +21,6 @@
        handler = new(ScheduleTask)
    case fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId):
        handler = &PlcAddress{Topic: topic}
    case fmt.Sprintf(constvar.NsqTopicProcessParamsResponse, conf.Conf.NsqConf.NodeId):
        handler = &ProcessParams{Topic: topic}
    case fmt.Sprintf(constvar.NsqTopicApsProcessParams, conf.Conf.NsqConf.NodeId):
        handler = &ProcessParamsSync{Topic: topic}
    case fmt.Sprintf(constvar.NsqTopicDeviceUpdate, conf.Conf.NsqConf.NodeId):
nsq/msg_handler.go
@@ -299,7 +299,8 @@
    switch pullDataResponse.DataType {
    case common.PullDataTypeProcessModel:
        err = slf.DealProcessModelData(pullDataResponse.Data)
    case common.PullDataTypeDevice:
        err = slf.DealDeviceData(pullDataResponse.Data)
    }
    if err != nil {
        logx.Infof("process pull data  err :%s", err)
@@ -346,3 +347,36 @@
    }
    return nil
}
func (slf *PullDataResponse) DealDeviceData(data interface{}) error {
    var devices []*model.Device
    err := mapstructure.Decode(data, &devices)
    if err != nil {
        return err
    }
    numbers := make([]string, 0, len(devices))
    for _, item := range devices {
        numbers = append(numbers, item.DeviceID)
    }
    existsDevices, err := model.NewDeviceSearch().SetDeviceIds(numbers).FindNotTotal()
    if err != nil {
        return err
    }
    existsDeviceMap := make(map[string]*model.Device, len(existsDevices))
    for _, device := range existsDevices {
        existsDeviceMap[device.DeviceID] = device
    }
    for _, device := range devices {
        if oldDevice, exists := existsDeviceMap[device.DeviceID]; exists {
            if oldDevice.ExtChannelAmount != device.ExtChannelAmount || //todo to be continued
                oldDevice.Procedures != device.Procedures {
                err = model.NewDeviceSearch().SetDeviceId(device.DeviceID).Save(device)
            }
        } else {
            err = model.NewDeviceSearch().Create(device)
        }
    }
    return err
}
nsq/nsq.go
@@ -34,7 +34,6 @@
    var topics = []string{
        constvar.NsqTopicScheduleTask,
        constvar.NsqTopicSendPlcAddress,
        constvar.NsqTopicProcessParamsResponse,
        constvar.NsqTopicApsProcessParams,
        constvar.NsqTopicDeviceUpdate,
        constvar.NsqTopicPullDataResponse,
@@ -73,7 +72,9 @@
        if consumer, ok := value.(*nsqclient.NsqConsumer); ok {
            nsqclient.DestroyNsqConsumer(consumer)
            logx.Infof("try stop consumer, topic : %v", key)
            consumer = nil
        }
        return true
    })
}