From 5a9e2e97e78a05209a57a75a75678d67c32c58d5 Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期六, 21 十月 2023 11:25:20 +0800 Subject: [PATCH] debug over --- nsq/msg_handler.go | 180 ++++++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 162 insertions(+), 18 deletions(-) diff --git a/nsq/msg_handler.go b/nsq/msg_handler.go index 7b62157..335c143 100644 --- a/nsq/msg_handler.go +++ b/nsq/msg_handler.go @@ -11,9 +11,9 @@ "apsClient/utils/file" "encoding/json" "fmt" + "github.com/jinzhu/gorm" "github.com/mitchellh/mapstructure" "github.com/spf13/cast" - "gorm.io/gorm" "strings" ) @@ -79,6 +79,7 @@ } err = model.WithTransaction(func(db *gorm.DB) error { + var err error if oldWorkOrder != nil && oldWorkOrder.WorkOrderID != "" { if err = model.NewOrderSearch(db).SetId(oldWorkOrder.ID).Updates(&orderRecord); err != nil { return err @@ -93,12 +94,14 @@ if err = model.NewOrderSearch(db).Create(&orderRecord); err != nil { return err } - return model.NewProceduresSearch(db).CreateBatch(procedureRecords) + if err = model.NewProceduresSearch(db).CreateBatch(procedureRecords); err != nil { + return err + } } return nil }) if err != nil { - logx.Errorf(" save task message error err: %v", err.Error()) + logx.Errorf(" save task message error err: %v", err) return err } } @@ -170,8 +173,8 @@ } detail, err := json.Marshal(record.Details) record.Detail = string(detail) - record.Id = 1 - err = model.NewDevicePlcSearch().SetId(record.Id).Save(&record) + record.ID = 1 + err = model.NewDevicePlcSearch().SetId(record.ID).Save(&record) if err != nil { return err } @@ -213,13 +216,28 @@ func (slf *ProcessParamsSync) HandleMessage(data []byte) (err error) { logx.Infof("get an process params sync message :%s", data) - var processModel model.ProcessModel - err = json.Unmarshal(data, &processModel) + var processModelMsg common.ResponseProcessParams + err = json.Unmarshal(data, &processModelMsg) if err != nil { logx.Infof("unmarshal process params sync err :%s", err) return err } - err = model.NewProcessModelSearch().Save(&processModel) + processModel := model.ProcessModel{ + Number: processModelMsg.Number, + Product: processModelMsg.Product, + Procedure: processModelMsg.Procedure, + Params: processModelMsg.Params, + IsNew: true, + } + err = model.WithTransaction(func(db *gorm.DB) error { + err = model.NewProcessModelSearch().SetOrm(db).SetProduct(processModel.Product).SetProcedure(processModel.Procedure). + UpdateByMap(map[string]interface{}{"is_new": 0}) + if err != nil { + return err + } + return model.NewProcessModelSearch().SetOrm(db).Create(&processModel) + }) + if err != nil { logx.Infof("save process params sync err :%s", err) return err @@ -233,30 +251,29 @@ func (slf *DeviceUpdate) HandleMessage(data []byte) (err error) { logx.Infof("get a device update message :%s", data) - var device common.Device - err = json.Unmarshal(data, &device) + var msg common.DeviceMsg + err = json.Unmarshal(data, &msg) if err != nil { logx.Infof("unmarshal device update msg err :%s", err) return err } - procedures := make([]string, 0, len(device.DeviceProcedureAttr)) - for _, attr := range device.DeviceProcedureAttr { + procedures := make([]string, 0, len(msg.DeviceProcedureAttr)) + for _, attr := range msg.DeviceProcedureAttr { procedures = append(procedures, attr.ProcedureName) } deviceRecord := &model.Device{ - DeviceID: device.ID, + DeviceID: msg.ID, Procedures: strings.Join(procedures, ","), - ExtChannelAmount: device.ExtChannelAmount, + ExtChannelAmount: msg.ExtChannelAmount, } - oldRecord, err := model.NewDeviceSearch().SetDeviceId(device.ID).First() + oldRecord, err := model.NewDeviceSearch().SetDeviceId(msg.ID).First() if err == gorm.ErrRecordNotFound { err = model.NewDeviceSearch().Create(deviceRecord) - } else { - deviceRecord.ID = oldRecord.ID - err = model.NewDeviceSearch().Save(deviceRecord) + } else if oldRecord.ExtChannelAmount != deviceRecord.ExtChannelAmount || oldRecord.Procedures != deviceRecord.Procedures { + err = model.NewDeviceSearch().SetDeviceId(msg.ID).Save(deviceRecord) } if err != nil { @@ -264,5 +281,132 @@ return err } + if msg.PlcConfig == nil || msg.PlcConfig.Method == "" { + return nil + } + + msg.PlcConfig.Method.ConvertToEnglish() + var record model.DevicePlc + err = mapstructure.Decode(msg.PlcConfig, &record) + if err != nil { + logx.Infof("decode from PlcAddressList to DevicePlc Details err:%v", err) + } + err = mapstructure.Decode(msg.PlcAddressList, &record.Details) + if err != nil { + logx.Infof("decode from PlcAddressList to DevicePlc Details err:%v", err) + } + detail, err := json.Marshal(record.Details) + record.Detail = string(detail) + record.DeviceID = msg.ID + oldPlcRecord, err := model.NewDevicePlcSearch().SetDeviceId(msg.ID).First() + if err == gorm.ErrRecordNotFound { + err = model.NewDevicePlcSearch().Create(&record) + } else if record.Port != oldPlcRecord.Port || + record.Method != oldPlcRecord.Method || + record.Address != oldPlcRecord.Address || + record.Detail != oldPlcRecord.Detail { + err = model.NewDevicePlcSearch().SetDeviceId(msg.ID).Save(&record) + } + if err != nil { + return err + } + return nil } + +type PullDataResponse struct { + Topic string +} + +func (slf *PullDataResponse) HandleMessage(data []byte) (err error) { + logx.Infof("get a pull data response message :%s", data) + var pullDataResponse common.MsgPullDataResponse + err = json.Unmarshal(data, &pullDataResponse) + if err != nil { + logx.Infof("unmarshal msg err :%s", err) + return err + } + switch pullDataResponse.DataType { + case common.PullDataTypeProcessModel: + err = slf.DealProcessModelData(pullDataResponse.Data) + case common.PullDataTypeDevice: + err = slf.DealDeviceData(pullDataResponse.Data) + } + if err != nil { + logx.Infof("process pull data err :%s", err) + return err + } + return nil +} + +func (slf *PullDataResponse) DealProcessModelData(data interface{}) error { + var processModels []*model.ProcessModel + err := mapstructure.Decode(data, &processModels) + if err != nil { + return err + } + numbers := make([]string, 0, len(processModels)) + for _, processModel := range processModels { + numbers = append(numbers, processModel.Number) + } + existsProcessModels, err := model.NewProcessModelSearch().SetIsNew(true).SetNumbers(numbers).FindNotTotal() + if err != nil { + return err + } + + existsProcessModelsMap := make(map[string]struct{}, len(existsProcessModels)) + for _, processModel := range existsProcessModels { + existsProcessModelsMap[processModel.Number] = struct{}{} + } + + for _, processModel := range processModels { + if _, exists := existsProcessModelsMap[processModel.Number]; exists { + continue + } + err = model.WithTransaction(func(db *gorm.DB) error { + err = model.NewProcessModelSearch().SetOrm(db).SetProcedure(processModel.Procedure).SetProduct(processModel.Product).SetIsNew(true).UpdateByMap(map[string]interface{}{"is_new": 0}) + if err != nil { + return err + } + processModel.IsNew = true + return model.NewProcessModelSearch().SetOrm(db).Create(processModel) + }) + if err != nil { + return err + } + } + return nil +} + +func (slf *PullDataResponse) DealDeviceData(data interface{}) error { + var devices []*model.Device + err := mapstructure.Decode(data, &devices) + if err != nil { + return err + } + numbers := make([]string, 0, len(devices)) + for _, item := range devices { + numbers = append(numbers, item.DeviceID) + } + existsDevices, err := model.NewDeviceSearch().SetDeviceIds(numbers).FindNotTotal() + if err != nil { + return err + } + + existsDeviceMap := make(map[string]*model.Device, len(existsDevices)) + for _, device := range existsDevices { + existsDeviceMap[device.DeviceID] = device + } + + for _, device := range devices { + if oldDevice, exists := existsDeviceMap[device.DeviceID]; exists { + if oldDevice.ExtChannelAmount != device.ExtChannelAmount || //todo to be continued + oldDevice.Procedures != device.Procedures { + err = model.NewDeviceSearch().SetDeviceId(device.DeviceID).Save(device) + } + } else { + err = model.NewDeviceSearch().Create(device) + } + } + return err +} -- Gitblit v1.8.0