删除不再用的topic,增加初始化的时候,设备拉取
| | |
| | | package constvar |
| | | |
| | | const ( |
| | | NsqTopicScheduleTask = "aps.%v.scheduleTask" //排程任务下发 |
| | | NsqTopicGetPlcAddress = "aps.%v.getPlcAddress" |
| | | NsqTopicSendPlcAddress = "aps.%v.sendPlcAddress" |
| | | NsqTopicProcessParamsRequest = "aps.%v.processParams.request" |
| | | NsqTopicProcessParamsResponse = "aps.%v.processParams.response" |
| | | NsqTopicScheduleTask = "aps.%v.scheduleTask" //排程任务下发 |
| | | NsqTopicSendPlcAddress = "aps.%v.sendPlcAddress" //plc address更新 |
| | | NsqTopicApsProcessParams = "aps.%v.aps.processParams" //有了新的工艺模型 |
| | | NsqTopicTaskProcedureStatusUpdate = "aps.%v.task.procedure.status" //工序状态更新 |
| | | NsqTopicSyncTaskProgress = "aps.%v.task.procedure.progress" //工序生产进度 |
| | |
| | | caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse) |
| | | err := caller.Send(msg) |
| | | if err != nil { |
| | | logx.Errorf("send pull data msg error:%v", err.Error()) |
| | | logx.Errorf("send pull data msg error:%v, msg:%+v", err.Error(), msg) |
| | | } |
| | | caller = nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), "") |
| | | err = caller.Send(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}) |
| | | msg = &common.MsgPullDataRequest{DataType: common.PullDataTypeDevice} |
| | | caller = nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse) |
| | | err = caller.Send(msg) |
| | | if err != nil { |
| | | logx.Infof("get plc address err: %v", err.Error()) |
| | | logx.Errorf("send pull data msg error:%v, msg:%+v", err.Error(), msg) |
| | | } |
| | | } |
| | |
| | | |
| | | const ( |
| | | PullDataTypeProcessModel = "process_model" |
| | | PullDataTypeDevice = "device" |
| | | ) |
| | | |
| | | // MsgPullDataRequest 拉取云端数据 |
| | |
| | | |
| | | DeviceSearch struct { |
| | | Device |
| | | Order string |
| | | PageNum int |
| | | PageSize int |
| | | Orm *gorm.DB |
| | | Order string |
| | | PageNum int |
| | | PageSize int |
| | | Orm *gorm.DB |
| | | DeviceIDs []string |
| | | } |
| | | ) |
| | | |
| | |
| | | return slf |
| | | } |
| | | |
| | | func (slf *DeviceSearch) SetDeviceIds(deviceIds []string) *DeviceSearch { |
| | | slf.DeviceIDs = deviceIds |
| | | return slf |
| | | } |
| | | |
| | | func (slf *DeviceSearch) build() *gorm.DB { |
| | | var db = slf.Orm.Table(slf.TableName()) |
| | | |
| | |
| | | db = db.Where("device_id = ?", slf.DeviceID) |
| | | } |
| | | |
| | | if len(slf.DeviceIDs) != 0 { |
| | | db = db.Where("device_id in (?)", slf.DeviceIDs) |
| | | } |
| | | |
| | | if slf.Order != "" { |
| | | db = db.Order(slf.Order) |
| | | } |
| | |
| | | handler = new(ScheduleTask) |
| | | case fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId): |
| | | handler = &PlcAddress{Topic: topic} |
| | | case fmt.Sprintf(constvar.NsqTopicProcessParamsResponse, conf.Conf.NsqConf.NodeId): |
| | | handler = &ProcessParams{Topic: topic} |
| | | case fmt.Sprintf(constvar.NsqTopicApsProcessParams, conf.Conf.NsqConf.NodeId): |
| | | handler = &ProcessParamsSync{Topic: topic} |
| | | case fmt.Sprintf(constvar.NsqTopicDeviceUpdate, conf.Conf.NsqConf.NodeId): |
| | |
| | | switch pullDataResponse.DataType { |
| | | case common.PullDataTypeProcessModel: |
| | | err = slf.DealProcessModelData(pullDataResponse.Data) |
| | | |
| | | case common.PullDataTypeDevice: |
| | | err = slf.DealDeviceData(pullDataResponse.Data) |
| | | } |
| | | if err != nil { |
| | | logx.Infof("process pull data err :%s", err) |
| | |
| | | } |
| | | return nil |
| | | } |
| | | |
| | | func (slf *PullDataResponse) DealDeviceData(data interface{}) error { |
| | | var devices []*model.Device |
| | | err := mapstructure.Decode(data, &devices) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | numbers := make([]string, 0, len(devices)) |
| | | for _, item := range devices { |
| | | numbers = append(numbers, item.DeviceID) |
| | | } |
| | | existsDevices, err := model.NewDeviceSearch().SetDeviceIds(numbers).FindNotTotal() |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | existsDeviceMap := make(map[string]*model.Device, len(existsDevices)) |
| | | for _, device := range existsDevices { |
| | | existsDeviceMap[device.DeviceID] = device |
| | | } |
| | | |
| | | for _, device := range devices { |
| | | if oldDevice, exists := existsDeviceMap[device.DeviceID]; exists { |
| | | if oldDevice.ExtChannelAmount != device.ExtChannelAmount || //todo to be continued |
| | | oldDevice.Procedures != device.Procedures { |
| | | err = model.NewDeviceSearch().SetDeviceId(device.DeviceID).Save(device) |
| | | } |
| | | } else { |
| | | err = model.NewDeviceSearch().Create(device) |
| | | } |
| | | } |
| | | return err |
| | | } |
| | |
| | | var topics = []string{ |
| | | constvar.NsqTopicScheduleTask, |
| | | constvar.NsqTopicSendPlcAddress, |
| | | constvar.NsqTopicProcessParamsResponse, |
| | | constvar.NsqTopicApsProcessParams, |
| | | constvar.NsqTopicDeviceUpdate, |
| | | constvar.NsqTopicPullDataResponse, |
| | |
| | | if consumer, ok := value.(*nsqclient.NsqConsumer); ok { |
| | | nsqclient.DestroyNsqConsumer(consumer) |
| | | logx.Infof("try stop consumer, topic : %v", key) |
| | | consumer = nil |
| | | } |
| | | |
| | | return true |
| | | }) |
| | | } |