From eba4eb850f0ecfb5839395aa125955ceaa2a454f Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期四, 19 十月 2023 16:39:07 +0800
Subject: [PATCH] Merge branch 'feat-serf'

---
 nsq/msg_handler.go |   71 +++++++++++++++++++++++++++++++++--
 1 files changed, 67 insertions(+), 4 deletions(-)

diff --git a/nsq/msg_handler.go b/nsq/msg_handler.go
index 175f3e7..1db651e 100644
--- a/nsq/msg_handler.go
+++ b/nsq/msg_handler.go
@@ -11,9 +11,9 @@
 	"apsClient/utils/file"
 	"encoding/json"
 	"fmt"
+	"github.com/jinzhu/gorm"
 	"github.com/mitchellh/mapstructure"
 	"github.com/spf13/cast"
-	"gorm.io/gorm"
 	"strings"
 )
 
@@ -98,7 +98,7 @@
 			return nil
 		})
 		if err != nil {
-			logx.Errorf(" save task message error err: %v", err.Error())
+			logx.Errorf(" save task message error err: %v", err)
 			return err
 		}
 	}
@@ -170,8 +170,8 @@
 	}
 	detail, err := json.Marshal(record.Details)
 	record.Detail = string(detail)
-	record.Id = 1
-	err = model.NewDevicePlcSearch().SetId(record.Id).Save(&record)
+	record.ID = 1
+	err = model.NewDevicePlcSearch().SetId(record.ID).Save(&record)
 	if err != nil {
 		return err
 	}
@@ -275,3 +275,66 @@
 
 	return nil
 }
+
+type PullDataResponse struct {
+	Topic string
+}
+
+func (slf *PullDataResponse) HandleMessage(data []byte) (err error) {
+	logx.Infof("get a pull data response message :%s", data)
+	var pullDataResponse common.MsgPullDataResponse
+	err = json.Unmarshal(data, &pullDataResponse)
+	if err != nil {
+		logx.Infof("unmarshal msg err :%s", err)
+		return err
+	}
+	switch pullDataResponse.DataType {
+	case common.PullDataTypeProcessModel:
+		err = slf.DealProcessModelData(pullDataResponse.Data)
+
+	}
+	if err != nil {
+		logx.Infof("process pull data  err :%s", err)
+		return err
+	}
+	return nil
+}
+
+func (slf *PullDataResponse) DealProcessModelData(data interface{}) error {
+	var processModels []*model.ProcessModel
+	err := mapstructure.Decode(data, &processModels)
+	if err != nil {
+		return err
+	}
+	numbers := make([]string, 0, len(processModels))
+	for _, processModel := range processModels {
+		numbers = append(numbers, processModel.Number)
+	}
+	existsProcessModels, err := model.NewProcessModelSearch().SetIsNew(true).SetNumbers(numbers).FindNotTotal()
+	if err != nil {
+		return err
+	}
+
+	existsProcessModelsMap := make(map[string]struct{}, len(existsProcessModels))
+	for _, processModel := range existsProcessModels {
+		existsProcessModelsMap[processModel.Number] = struct{}{}
+	}
+
+	for _, processModel := range processModels {
+		if _, exists := existsProcessModelsMap[processModel.Number]; exists {
+			continue
+		}
+		err = model.WithTransaction(func(db *gorm.DB) error {
+			err = model.NewProcessModelSearch().SetOrm(db).SetProcedure(processModel.Procedure).SetProduct(processModel.Product).SetIsNew(true).UpdateByMap(map[string]interface{}{"is_new": 0})
+			if err != nil {
+				return err
+			}
+			processModel.IsNew = true
+			return model.NewProcessModelSearch().SetOrm(db).Create(processModel)
+		})
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}

--
Gitblit v1.8.0