package nsq import ( "apsClient/conf" "apsClient/constvar" "apsClient/model" "apsClient/model/common" "apsClient/pkg/logx" "apsClient/pkg/structx" "apsClient/service/plc_address" "apsClient/utils/file" "encoding/json" "fmt" "github.com/spf13/cast" "gorm.io/gorm" "strings" ) type ReceivedMessage struct { Topic string Message []byte } var ReceivedMessageChan chan *ReceivedMessage func init() { ReceivedMessageChan = make(chan *ReceivedMessage, 1000) } type MsgHandler interface { HandleMessage(data []byte) (err error) } type ScheduleTask struct { } func (slf *ScheduleTask) HandleMessage(data []byte) (err error) { logx.Infof("get an message :%s", data) var tasks = make([]*common.DeliverScheduleTask, 0) err = json.Unmarshal(data, &tasks) if err != nil { logx.Errorf("ScheduleTask HandleMessage Unmarshal json err: %v", err.Error()) return err } for _, task := range tasks { oldWorkOrder, err := model.NewOrderSearch(nil).SetWorkOrderId(task.WorkOrder.WorkOrderID).First() if oldWorkOrder != nil && oldWorkOrder.WorkOrderID != "" && oldWorkOrder.Status != model.OrderStatusWaitProcess { //已开始的工序不可以修改 continue } procedureRecords := make([]*model.Procedures, 0, len(task.Procedures)) for _, procedure := range task.Procedures { procedureRecord := model.Procedures{ WorkOrderID: task.WorkOrder.WorkOrderID, OrderID: task.WorkOrder.OrderID, DeviceID: procedure.DeviceID, StartTime: procedure.StartTime, EndTime: procedure.EndTime, Status: model.ProcedureStatusWaitProcess, ProcedureData: "", ProceduresInfo: common.ProductProcedure{}, } procedureData, err := json.Marshal(procedure) if err != nil { return err } procedureRecord.ProcedureData = string(procedureData) procedureRecords = append(procedureRecords, &procedureRecord) } var orderRecord model.Order err = structx.AssignTo(task.WorkOrder, &orderRecord) orderRecord.Status = model.OrderStatusWaitProcess if err != nil { logx.Errorf(" structx.Assign task.Order to orderRecord err: %v", err.Error()) return err } err = model.WithTransaction(func(db *gorm.DB) error { if err = model.NewOrderSearch(db).Save(&orderRecord); err != nil { return err } return model.NewProceduresSearch(db).CreateBatch(procedureRecords) }) if err != nil { logx.Errorf(" save task message error err: %v", err.Error()) return err } } return nil } type PlcAddress struct { Topic string } func (slf *PlcAddress) HandleMessage(data []byte) (err error) { logx.Infof("get an PlcAddress message :%s", data) var resp = new(common.ResponsePlcAddress) err = json.Unmarshal(data, &resp) if err != nil { logx.Errorf("ScheduleTask HandleMessage Unmarshal json err: %v", err.Error()) return nil } if resp.DeviceId != conf.Conf.System.DeviceId { return nil } //通知回复收到 ReceivedMessageChan <- &ReceivedMessage{ Topic: slf.Topic, Message: data, } if len(resp.KeyData) == 0 || len(resp.AddressData) == 0 { return nil } //写入到文件 err = file.WriteFile(fmt.Sprintf("%s%s", constvar.PlcAddressDataPath, constvar.PlcAddressDataKeyFileName), resp.KeyData) if err != nil { return err } err = file.WriteFile(fmt.Sprintf("%s%s", constvar.PlcAddressDataPath, constvar.PlcAddressDataValueFileName), resp.AddressData) if err != nil { return err } //写入到内存 keyString := string(resp.KeyData) addressString := string(resp.AddressData) keys := strings.Split(keyString, "\n") addresses := strings.Split(addressString, "\n") if len(keys) != len(addresses) { logx.Error("plc address message error: key length not equal address length") return nil } for i := 0; i < len(keys); i++ { key := strings.ReplaceAll(keys[i], "\r", "") address := cast.ToInt(strings.ReplaceAll(addresses[i], "\r", "")) plc_address.Set(key, address) logx.Infof("plc address set ok: key:%v, address:%v", key, address) } return nil } type ProcessParams struct { Topic string } func (slf *ProcessParams) HandleMessage(data []byte) (err error) { logx.Infof("get an process params message :%s", data) var processModel model.ProcessModel err = json.Unmarshal(data, &processModel) if err != nil { logx.Infof("unmarshal process params sync err :%s", err) return err } if processModel.DeviceId != conf.Conf.System.DeviceId { //不是发给本设备的消息 return nil } if processModel.Params != "" { err = model.NewProcessModelSearch().Create(&processModel) if err != nil { logx.Infof("save process params err :%s", err) } } //通知回复收到 ReceivedMessageChan <- &ReceivedMessage{ Topic: slf.Topic, Message: data, } return nil } type ProcessParamsSync struct { Topic string } func (slf *ProcessParamsSync) HandleMessage(data []byte) (err error) { logx.Infof("get an process params sync message :%s", data) var processModel model.ProcessModel err = json.Unmarshal(data, &processModel) if err != nil { logx.Infof("unmarshal process params sync err :%s", err) return err } err = model.NewProcessModelSearch().Save(&processModel) if err != nil { logx.Infof("save process params sync err :%s", err) return err } return nil }