zhangqian
2023-10-21 30f35de47284269b475c3aa307d06c033b67aa27
精简topic
5个文件已修改
44 ■■■■ 已修改文件
constvar/const.go 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/consumer.go 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/msg_handler.go 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/nsq.go 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constvar/const.go
@@ -2,7 +2,6 @@
const (
    NsqTopicScheduleTask              = "aps.%v.scheduleTask"            //排程任务下发
    NsqTopicSendPlcAddress            = "aps.%v.sendPlcAddress"          //plc address更新
    NsqTopicApsProcessParams          = "aps.%v.aps.processParams"       //有了新的工艺模型
    NsqTopicTaskProcedureStatusUpdate = "aps.%v.task.procedure.status"   //工序状态更新
    NsqTopicSyncTaskProgress          = "aps.%v.task.procedure.progress" //工序生产进度
main.go
@@ -47,11 +47,10 @@
    agent.RegisterClusterEvent(serfClusterEvent)
    go agent.Serve(serfStartChan)
    <-serfStartChan
    //if !<-serfStartChan {
    //    logx.Errorf("serf Init err, exit")
    //    return
    //}
    if !<-serfStartChan {
        logx.Errorf("serf Init err, exit")
        return
    }
    // 判断当前集群状态
    logx.Infof("current agent.ClusterStatus:%v", agent.ClusterStatus)
nsq/consumer.go
@@ -19,8 +19,6 @@
    switch topic {
    case fmt.Sprintf(constvar.NsqTopicScheduleTask, conf.Conf.NsqConf.NodeId):
        handler = new(ScheduleTask)
    case fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId):
        handler = &PlcAddress{Topic: topic}
    case fmt.Sprintf(constvar.NsqTopicApsProcessParams, conf.Conf.NsqConf.NodeId):
        handler = &ProcessParamsSync{Topic: topic}
    case fmt.Sprintf(constvar.NsqTopicDeviceUpdate, conf.Conf.NsqConf.NodeId):
nsq/msg_handler.go
@@ -384,34 +384,5 @@
}
func (slf *PullDataResponse) DealDeviceData(data interface{}) error {
    var devices []*model.Device
    err := mapstructure.Decode(data, &devices)
    if err != nil {
        return err
    }
    numbers := make([]string, 0, len(devices))
    for _, item := range devices {
        numbers = append(numbers, item.DeviceID)
    }
    existsDevices, err := model.NewDeviceSearch().SetDeviceIds(numbers).FindNotTotal()
    if err != nil {
        return err
    }
    existsDeviceMap := make(map[string]*model.Device, len(existsDevices))
    for _, device := range existsDevices {
        existsDeviceMap[device.DeviceID] = device
    }
    for _, device := range devices {
        if oldDevice, exists := existsDeviceMap[device.DeviceID]; exists {
            if oldDevice.ExtChannelAmount != device.ExtChannelAmount || //todo to be continued
                oldDevice.Procedures != device.Procedures {
                err = model.NewDeviceSearch().SetDeviceId(device.DeviceID).Save(device)
            }
        } else {
            err = model.NewDeviceSearch().Create(device)
        }
    }
    return err
    return nil
}
nsq/nsq.go
@@ -37,7 +37,6 @@
    }
    var topics = []string{
        constvar.NsqTopicScheduleTask,
        constvar.NsqTopicSendPlcAddress,
        constvar.NsqTopicApsProcessParams,
        constvar.NsqTopicDeviceUpdate,
        constvar.NsqTopicPullDataResponse,