package nsq import ( "apsClient/conf" "apsClient/constvar" "apsClient/model" "apsClient/model/common" "apsClient/pkg/logx" "apsClient/pkg/structx" "apsClient/service/plc_address" "apsClient/utils/file" "encoding/json" "fmt" "github.com/spf13/cast" "gorm.io/gorm" "strings" ) type ReceivedMessage struct { Topic string Message []byte } var ReceivedMessageChan chan *ReceivedMessage func init() { ReceivedMessageChan = make(chan *ReceivedMessage, 1000) } type MsgHandler interface { HandleMessage(data []byte) (err error) } type ScheduleTask struct { } func (slf *ScheduleTask) HandleMessage(data []byte) (err error) { logx.Infof("get an message :%s", data) var tasks = make([]*common.DeliverScheduleTask, 0) err = json.Unmarshal(data, &tasks) if err != nil { logx.Errorf("ScheduleTask HandleMessage Unmarshal json err: %v", err.Error()) return err } for _, task := range tasks { for _, procedure := range task.Procedures { if procedure.DeviceID == conf.Conf.System.DeviceId { //只存储本设备的工序 err = model.WithTransaction(func(db *gorm.DB) error { _, err := model.NewOrderSearch(db).SetWorkOrderId(task.WorkOrder.WorkOrderID).First() if err == gorm.ErrRecordNotFound { var orderRecord model.Order err = structx.AssignTo(task.WorkOrder, &orderRecord) orderRecord.Status = model.OrderStatusWaitProcess if err != nil { logx.Errorf(" structx.Assign task.Order to orderRecord err: %v", err.Error()) return err } if err = model.NewOrderSearch(db).Create(&orderRecord); err != nil { return err } } procedureRecord := model.Procedures{ StartTime: procedure.StartTime, EndTime: procedure.EndTime, WorkOrderID: task.WorkOrder.WorkOrderID, OrderID: task.WorkOrder.OrderID, Status: model.ProcedureStatusWaitProcess, } procedureData, err := json.Marshal(procedure) procedureRecord.ProcedureData = string(procedureData) if err != nil { logx.Errorf(" json.Marshal(procedure) err: %v", err.Error()) return err } return model.NewProceduresSearch(db).Create(&procedureRecord) }) if err != nil { logx.Errorf(" save procedure err: %v", err.Error()) } } } } return nil } type PlcAddress struct { Topic string } func (slf *PlcAddress) HandleMessage(data []byte) (err error) { logx.Infof("get an PlcAddress message :%s", data) var resp = new(common.ResponsePlcAddress) err = json.Unmarshal(data, &resp) if err != nil { logx.Errorf("ScheduleTask HandleMessage Unmarshal json err: %v", err.Error()) return err } //写入到文件 err = file.WriteFile(fmt.Sprintf("%s%s", constvar.PlcAddressDataPath, constvar.PlcAddressDataKeyFileName), resp.KeyData) if err != nil { return err } err = file.WriteFile(fmt.Sprintf("%s%s", constvar.PlcAddressDataPath, constvar.PlcAddressDataValueFileName), resp.AddressData) if err != nil { return err } //写入到内存 keyString := string(resp.KeyData) addressString := string(resp.AddressData) keys := strings.Split(keyString, "\n") addresses := strings.Split(addressString, "\n") if len(keys) != len(addresses) { logx.Error("plc address message error: key length not equal address length") return nil } for i := 0; i < len(keys); i++ { key := strings.ReplaceAll(keys[i], "\r", "") address := cast.ToInt(strings.ReplaceAll(addresses[i], "\r", "")) plc_address.Set(key, address) logx.Infof("plc address set ok: key:%v, address:%v", key, address) } //通知回复收到 ReceivedMessageChan <- &ReceivedMessage{ Topic: slf.Topic, Message: data, } return nil } type ProcessParams struct { Topic string } func (slf *ProcessParams) HandleMessage(data []byte) (err error) { logx.Infof("get an process params message :%s", data) //通知回复收到 ReceivedMessageChan <- &ReceivedMessage{ Topic: slf.Topic, Message: data, } return nil } type ProcessParamsSync struct { Topic string } func (slf *ProcessParamsSync) HandleMessage(data []byte) (err error) { logx.Infof("get an process params sync message :%s", data) var processModel model.ProcessModel err = json.Unmarshal(data, &processModel) if err != nil { logx.Infof("unmarshal process params sync err :%s", err) return err } err = model.NewProcessModelSearch().Create(&processModel) if err != nil { logx.Infof("save process params sync err :%s", err) return err } return nil }