package nsq import ( "apsClient/conf" "apsClient/constvar" "apsClient/model" "apsClient/model/common" "apsClient/pkg/logx" "apsClient/pkg/structx" "apsClient/service/plc_address" "apsClient/utils/file" "encoding/json" "fmt" "github.com/jinzhu/gorm" "github.com/mitchellh/mapstructure" "github.com/spf13/cast" "strings" ) type ReceivedMessage struct { Topic string Message []byte } var ReceivedMessageChan chan *ReceivedMessage func init() { ReceivedMessageChan = make(chan *ReceivedMessage, 1000) } type MsgHandler interface { HandleMessage(data []byte) (err error) } type ScheduleTask struct { } func (slf *ScheduleTask) HandleMessage(data []byte) (err error) { logx.Infof("get an message :%s", data) var tasks = make([]*common.DeliverScheduleTask, 0) err = json.Unmarshal(data, &tasks) if err != nil { logx.Errorf("ScheduleTask HandleMessage Unmarshal json err: %v", err.Error()) return err } for _, task := range tasks { oldWorkOrder, err := model.NewOrderSearch(nil).SetWorkOrderId(task.WorkOrder.WorkOrderID).First() if oldWorkOrder != nil && oldWorkOrder.WorkOrderID != "" && oldWorkOrder.Status != model.OrderStatusWaitProcess { //已开始的工序不可以修改 continue } procedureRecords := make([]*model.Procedures, 0, len(task.Procedures)) for _, procedure := range task.Procedures { procedureRecord := model.Procedures{ ProductProcedureID: procedure.ProductProcedureID, WorkOrderID: task.WorkOrder.WorkOrderID, OrderID: task.WorkOrder.OrderID, DeviceID: procedure.DeviceID, ProcedureID: procedure.ProcedureID, StartTime: procedure.StartTime, EndTime: procedure.EndTime, Status: model.ProcedureStatusWaitProcess, Channel: procedure.Channel, ProceduresInfo: common.ProductProcedure{}, } procedureData, err := json.Marshal(procedure) if err != nil { return err } procedureRecord.ProcedureData = string(procedureData) procedureRecords = append(procedureRecords, &procedureRecord) } var orderRecord model.Order err = structx.AssignTo(task.WorkOrder, &orderRecord) orderRecord.Status = model.OrderStatusWaitProcess if err != nil { logx.Errorf(" structx.Assign task.Order to orderRecord err: %v", err.Error()) return err } err = model.WithTransaction(func(db *gorm.DB) error { var err error if oldWorkOrder != nil && oldWorkOrder.WorkOrderID != "" { if err = model.NewOrderSearch(db).SetId(oldWorkOrder.ID).Updates(&orderRecord); err != nil { return err } for _, procedure := range procedureRecords { err = model.NewProceduresSearch(db).SetWorkOrderId(procedure.WorkOrderID).SetProcedureId(procedure.ProcedureID).Upsert(procedure) if err != nil { return err } } } else { if err = model.NewOrderSearch(db).Create(&orderRecord); err != nil { return err } if err = model.NewProceduresSearch(db).CreateBatch(procedureRecords); err != nil { return err } } return nil }) if err != nil { logx.Errorf(" save task message error err: %v", err) return err } } return nil } type PlcAddress struct { Topic string } func (slf *PlcAddress) HandleMessage(data []byte) (err error) { logx.Infof("get an PlcAddress message :%s", data) var resp = new(common.ResponsePlcAddress) err = json.Unmarshal(data, &resp) if err != nil { logx.Errorf("ScheduleTask HandleMessage Unmarshal json err: %v", err.Error()) return nil } if resp.DeviceId != conf.Conf.System.DeviceId { return nil } //通知回复收到 ReceivedMessageChan <- &ReceivedMessage{ Topic: slf.Topic, Message: data, } if len(resp.KeyData) != 0 && len(resp.AddressData) != 0 { //写入到文件 err = file.WriteFile(fmt.Sprintf("%s%s", constvar.PlcAddressDataPath, constvar.PlcAddressDataKeyFileName), resp.KeyData) if err != nil { return err } err = file.WriteFile(fmt.Sprintf("%s%s", constvar.PlcAddressDataPath, constvar.PlcAddressDataValueFileName), resp.AddressData) if err != nil { return err } //写入到内存 keyString := string(resp.KeyData) addressString := string(resp.AddressData) keys := strings.Split(keyString, "\n") addresses := strings.Split(addressString, "\n") if len(keys) != len(addresses) { logx.Error("plc address message error: key length not equal address length") return nil } for i := 0; i < len(keys); i++ { key := strings.ReplaceAll(keys[i], "\r", "") address := cast.ToInt(strings.ReplaceAll(addresses[i], "\r", "")) plc_address.Set(key, address) logx.Infof("plc address set ok: key:%v, address:%v", key, address) } } if resp.PlcConfig.Method == "" { return nil } resp.PlcConfig.Method.ConvertToEnglish() var record model.DevicePlc err = mapstructure.Decode(resp.PlcConfig, &record) if err != nil { logx.Infof("decode from PlcAddressList to DevicePlc Details err:%v", err) } err = mapstructure.Decode(resp.PlcAddressList, &record.Details) if err != nil { logx.Infof("decode from PlcAddressList to DevicePlc Details err:%v", err) } detail, err := json.Marshal(record.Details) record.Detail = string(detail) record.ID = 1 err = model.NewDevicePlcSearch().SetId(record.ID).Save(&record) if err != nil { return err } return nil } type ProcessParams struct { Topic string } func (slf *ProcessParams) HandleMessage(data []byte) (err error) { logx.Infof("get an process params message :%s", data) var processModel model.ProcessModel err = json.Unmarshal(data, &processModel) if err != nil { logx.Infof("unmarshal process params sync err :%s", err) return err } if processModel.DeviceId != conf.Conf.System.DeviceId { //不是发给本设备的消息 return nil } if processModel.Params != "" { err = model.NewProcessModelSearch().Create(&processModel) if err != nil { logx.Infof("save process params err :%s", err) } } //通知回复收到 ReceivedMessageChan <- &ReceivedMessage{ Topic: slf.Topic, Message: data, } return nil } type ProcessParamsSync struct { Topic string } func (slf *ProcessParamsSync) HandleMessage(data []byte) (err error) { logx.Infof("get an process params sync message :%s", data) var processModelMsg common.ResponseProcessParams err = json.Unmarshal(data, &processModelMsg) if err != nil { logx.Infof("unmarshal process params sync err :%s", err) return err } processModel := model.ProcessModel{ Number: processModelMsg.Number, Product: processModelMsg.Product, Procedure: processModelMsg.Procedure, Params: processModelMsg.Params, IsNew: true, } err = model.WithTransaction(func(db *gorm.DB) error { err = model.NewProcessModelSearch().SetOrm(db).SetProduct(processModel.Product).SetProcedure(processModel.Procedure). UpdateByMap(map[string]interface{}{"is_new": 0}) if err != nil { return err } return model.NewProcessModelSearch().SetOrm(db).Create(&processModel) }) if err != nil { logx.Infof("save process params sync err :%s", err) return err } return nil } type DeviceUpdate struct { Topic string } func (slf *DeviceUpdate) HandleMessage(data []byte) (err error) { logx.Infof("get a device update message :%s", data) var msg common.DeviceMsg err = json.Unmarshal(data, &msg) if err != nil { logx.Infof("unmarshal device update msg err :%s", err) return err } procedures := make([]string, 0, len(msg.DeviceProcedureAttr)) for _, attr := range msg.DeviceProcedureAttr { procedures = append(procedures, attr.ProcedureName) } deviceRecord := &model.Device{ DeviceID: msg.ID, Procedures: strings.Join(procedures, ","), ExtChannelAmount: msg.ExtChannelAmount, DeviceMac: msg.DeviceMac, DeviceName: msg.DeviceName, } oldRecord, err := model.NewDeviceSearch().SetDeviceId(msg.ID).First() if err == gorm.ErrRecordNotFound { err = model.NewDeviceSearch().Create(deviceRecord) } else if oldRecord.ExtChannelAmount != deviceRecord.ExtChannelAmount || oldRecord.Procedures != deviceRecord.Procedures || oldRecord.DeviceMac != deviceRecord.DeviceMac || oldRecord.DeviceName != deviceRecord.DeviceName { err = model.NewDeviceSearch().SetDeviceId(msg.ID).Save(deviceRecord) } if err != nil { logx.Infof("save device record err :%s", err) return err } if msg.PlcConfig == nil || msg.PlcConfig.Method == "" { return nil } msg.PlcConfig.Method.ConvertToEnglish() var record model.DevicePlc err = mapstructure.Decode(msg.PlcConfig, &record) if err != nil { logx.Infof("decode from PlcAddressList to DevicePlc Details err:%v", err) } detail, err := json.Marshal(record.Details) record.Detail = string(detail) record.DeviceID = msg.ID oldPlcRecord, err := model.NewDevicePlcSearch().SetDeviceId(msg.ID).First() if err == gorm.ErrRecordNotFound { err = model.NewDevicePlcSearch().Create(&record) } else if record.Port != oldPlcRecord.Port || record.Method != oldPlcRecord.Method || record.Address != oldPlcRecord.Address || record.Detail != oldPlcRecord.Detail || record.BaudRate != oldPlcRecord.BaudRate || record.SerialName != oldPlcRecord.SerialName || record.DataBit != oldPlcRecord.DataBit || record.StopBit != oldPlcRecord.StopBit || record.Parity != oldPlcRecord.Parity { oldPlcRecord.Method = record.Method oldPlcRecord.Address = record.Address oldPlcRecord.Port = record.Port oldPlcRecord.Detail = record.Detail oldPlcRecord.BaudRate = record.BaudRate oldPlcRecord.SerialName = record.SerialName oldPlcRecord.DataBit = record.DataBit oldPlcRecord.StopBit = record.StopBit oldPlcRecord.Parity = record.Parity err = model.NewDevicePlcSearch().Save(oldPlcRecord) } if err != nil { return err } return nil } type PullDataResponse struct { Topic string } func (slf *PullDataResponse) HandleMessage(data []byte) (err error) { logx.Infof("get a pull data response message :%s", data) var pullDataResponse common.MsgPullDataResponse err = json.Unmarshal(data, &pullDataResponse) if err != nil { logx.Infof("unmarshal msg err :%s", err) return err } switch pullDataResponse.DataType { case common.PullDataTypeProcessModel: err = slf.DealProcessModelData(pullDataResponse.Data) case common.PullDataTypeDevice: err = slf.DealDeviceData(pullDataResponse.Data) case common.PullDataTypeProcessModelPlcAddress: err = slf.DealProcessModelPlcAddressData(pullDataResponse.Data) } if err != nil { logx.Infof("process pull data err :%s", err) return err } return nil } func (slf *PullDataResponse) DealProcessModelData(data interface{}) error { var processModels []*model.ProcessModel err := mapstructure.Decode(data, &processModels) if err != nil { return err } numbers := make([]string, 0, len(processModels)) for _, processModel := range processModels { numbers = append(numbers, processModel.Number) } existsProcessModels, err := model.NewProcessModelSearch().SetIsNew(true).SetNumbers(numbers).FindNotTotal() if err != nil { return err } existsProcessModelsMap := make(map[string]struct{}, len(existsProcessModels)) for _, processModel := range existsProcessModels { existsProcessModelsMap[processModel.Number] = struct{}{} } for _, processModel := range processModels { if _, exists := existsProcessModelsMap[processModel.Number]; exists { continue } err = model.WithTransaction(func(db *gorm.DB) error { err = model.NewProcessModelSearch().SetOrm(db).SetProcedure(processModel.Procedure).SetProduct(processModel.Product).SetIsNew(true).UpdateByMap(map[string]interface{}{"is_new": 0}) if err != nil { return err } processModel.IsNew = true return model.NewProcessModelSearch().SetOrm(db).Create(processModel) }) if err != nil { return err } } return nil } func (slf *PullDataResponse) DealDeviceData(data interface{}) error { //已在别的topic处理 return nil } func (slf *PullDataResponse) DealProcessModelPlcAddressData(data interface{}) error { var addressList []*model.ProcessModelPlcAddress err := mapstructure.Decode(data, &addressList) if err != nil { return err } deviceIDs := make([]string, 0, len(addressList)) for _, item := range addressList { deviceIDs = append(deviceIDs, item.DeviceID) } existsRecords, err := model.NewProcessModelPlcAddressSearch().SetDeviceIDs(deviceIDs).FindNotTotal() if err != nil { return err } existsRecordsMap := make(map[string]*model.ProcessModelPlcAddress, len(existsRecords)) for _, item := range existsRecords { existsRecordsMap[item.DeviceID] = item } for _, item := range addressList { if v, exists := existsRecordsMap[item.DeviceID]; !exists { item.ID = 0 err = model.NewProcessModelPlcAddressSearch().Create(item) } else if v.Address != item.Address { v.Address = item.Address err = model.NewProcessModelPlcAddressSearch().Save(v) } } return nil } type Dashboard struct { Topic string } func (slf *Dashboard) HandleMessage(data []byte) (err error) { logx.Infof("dashboard message: %s", data) var dashboard model.Dashboard err = json.Unmarshal(data, &dashboard) if err != nil { logx.Infof("unmarshal msg err :%s", err) return err } old, err := model.NewDashboardSearch(nil).SetVersion(dashboard.Version).First() if err == gorm.ErrRecordNotFound { err = model.NewDashboardSearch(nil).SetVersion(dashboard.Version).Create(&dashboard) } else { err = model.NewDashboardSearch(nil).SetId(old.ID).Updates(&dashboard) } if err != nil { logx.Infof("process pull data err :%s", err) return err } return nil }