zhangqian
2023-10-19 eba4eb850f0ecfb5839395aa125955ceaa2a454f
nsq/msg_handler.go
@@ -11,9 +11,9 @@
   "apsClient/utils/file"
   "encoding/json"
   "fmt"
   "github.com/jinzhu/gorm"
   "github.com/mitchellh/mapstructure"
   "github.com/spf13/cast"
   "gorm.io/gorm"
   "strings"
)
@@ -98,7 +98,7 @@
         return nil
      })
      if err != nil {
         logx.Errorf(" save task message error err: %v", err.Error())
         logx.Errorf(" save task message error err: %v", err)
         return err
      }
   }
@@ -170,8 +170,8 @@
   }
   detail, err := json.Marshal(record.Details)
   record.Detail = string(detail)
   record.Id = 1
   err = model.NewDevicePlcSearch().SetId(record.Id).Save(&record)
   record.ID = 1
   err = model.NewDevicePlcSearch().SetId(record.ID).Save(&record)
   if err != nil {
      return err
   }
@@ -275,3 +275,66 @@
   return nil
}
type PullDataResponse struct {
   Topic string
}
func (slf *PullDataResponse) HandleMessage(data []byte) (err error) {
   logx.Infof("get a pull data response message :%s", data)
   var pullDataResponse common.MsgPullDataResponse
   err = json.Unmarshal(data, &pullDataResponse)
   if err != nil {
      logx.Infof("unmarshal msg err :%s", err)
      return err
   }
   switch pullDataResponse.DataType {
   case common.PullDataTypeProcessModel:
      err = slf.DealProcessModelData(pullDataResponse.Data)
   }
   if err != nil {
      logx.Infof("process pull data  err :%s", err)
      return err
   }
   return nil
}
func (slf *PullDataResponse) DealProcessModelData(data interface{}) error {
   var processModels []*model.ProcessModel
   err := mapstructure.Decode(data, &processModels)
   if err != nil {
      return err
   }
   numbers := make([]string, 0, len(processModels))
   for _, processModel := range processModels {
      numbers = append(numbers, processModel.Number)
   }
   existsProcessModels, err := model.NewProcessModelSearch().SetIsNew(true).SetNumbers(numbers).FindNotTotal()
   if err != nil {
      return err
   }
   existsProcessModelsMap := make(map[string]struct{}, len(existsProcessModels))
   for _, processModel := range existsProcessModels {
      existsProcessModelsMap[processModel.Number] = struct{}{}
   }
   for _, processModel := range processModels {
      if _, exists := existsProcessModelsMap[processModel.Number]; exists {
         continue
      }
      err = model.WithTransaction(func(db *gorm.DB) error {
         err = model.NewProcessModelSearch().SetOrm(db).SetProcedure(processModel.Procedure).SetProduct(processModel.Product).SetIsNew(true).UpdateByMap(map[string]interface{}{"is_new": 0})
         if err != nil {
            return err
         }
         processModel.IsNew = true
         return model.NewProcessModelSearch().SetOrm(db).Create(processModel)
      })
      if err != nil {
         return err
      }
   }
   return nil
}