zhangqian
2023-11-03 f008bf77342a93c13d1e42399dd175c9c941d3ef
nsq/msg_handler.go
@@ -345,6 +345,8 @@
      err = slf.DealProcessModelData(pullDataResponse.Data)
   case common.PullDataTypeDevice:
      err = slf.DealDeviceData(pullDataResponse.Data)
   case common.PullDataTypeProcessModelPlcAddress:
      err = slf.DealProcessModelPlcAddressData(pullDataResponse.Data)
   }
   if err != nil {
      logx.Infof("process pull data  err :%s", err)
@@ -393,5 +395,38 @@
}
func (slf *PullDataResponse) DealDeviceData(data interface{}) error {
   //已在别的topic处理
   return nil
}
func (slf *PullDataResponse) DealProcessModelPlcAddressData(data interface{}) error {
   var addressList []*model.ProcessModelPlcAddress
   err := mapstructure.Decode(data, &addressList)
   if err != nil {
      return err
   }
   deviceIDs := make([]string, 0, len(addressList))
   for _, item := range addressList {
      deviceIDs = append(deviceIDs, item.DeviceID)
   }
   existsRecords, err := model.NewProcessModelPlcAddressSearch().SetDeviceIDs(deviceIDs).FindNotTotal()
   if err != nil {
      return err
   }
   existsRecordsMap := make(map[string]*model.ProcessModelPlcAddress, len(existsRecords))
   for _, item := range existsRecords {
      existsRecordsMap[item.DeviceID] = item
   }
   for _, item := range addressList {
      if v, exists := existsRecordsMap[item.DeviceID]; !exists {
         item.ID = 0
         err = model.NewProcessModelPlcAddressSearch().Create(item)
      } else if v.Address != item.Address {
         v.Address = item.Address
         err = model.NewProcessModelPlcAddressSearch().Save(v)
      }
   }
   return nil
}