From 9b1536674ac1814f6416273a6e9129925b984332 Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期五, 20 十月 2023 11:03:30 +0800
Subject: [PATCH] 加日志
---
nsq/msg_handler.go | 63 +++++++++++++++++++++++++++++++
1 files changed, 63 insertions(+), 0 deletions(-)
diff --git a/nsq/msg_handler.go b/nsq/msg_handler.go
index a01d9ea..1db651e 100644
--- a/nsq/msg_handler.go
+++ b/nsq/msg_handler.go
@@ -275,3 +275,66 @@
return nil
}
+
+type PullDataResponse struct {
+ Topic string
+}
+
+func (slf *PullDataResponse) HandleMessage(data []byte) (err error) {
+ logx.Infof("get a pull data response message :%s", data)
+ var pullDataResponse common.MsgPullDataResponse
+ err = json.Unmarshal(data, &pullDataResponse)
+ if err != nil {
+ logx.Infof("unmarshal msg err :%s", err)
+ return err
+ }
+ switch pullDataResponse.DataType {
+ case common.PullDataTypeProcessModel:
+ err = slf.DealProcessModelData(pullDataResponse.Data)
+
+ }
+ if err != nil {
+ logx.Infof("process pull data err :%s", err)
+ return err
+ }
+ return nil
+}
+
+func (slf *PullDataResponse) DealProcessModelData(data interface{}) error {
+ var processModels []*model.ProcessModel
+ err := mapstructure.Decode(data, &processModels)
+ if err != nil {
+ return err
+ }
+ numbers := make([]string, 0, len(processModels))
+ for _, processModel := range processModels {
+ numbers = append(numbers, processModel.Number)
+ }
+ existsProcessModels, err := model.NewProcessModelSearch().SetIsNew(true).SetNumbers(numbers).FindNotTotal()
+ if err != nil {
+ return err
+ }
+
+ existsProcessModelsMap := make(map[string]struct{}, len(existsProcessModels))
+ for _, processModel := range existsProcessModels {
+ existsProcessModelsMap[processModel.Number] = struct{}{}
+ }
+
+ for _, processModel := range processModels {
+ if _, exists := existsProcessModelsMap[processModel.Number]; exists {
+ continue
+ }
+ err = model.WithTransaction(func(db *gorm.DB) error {
+ err = model.NewProcessModelSearch().SetOrm(db).SetProcedure(processModel.Procedure).SetProduct(processModel.Product).SetIsNew(true).UpdateByMap(map[string]interface{}{"is_new": 0})
+ if err != nil {
+ return err
+ }
+ processModel.IsNew = true
+ return model.NewProcessModelSearch().SetOrm(db).Create(processModel)
+ })
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
--
Gitblit v1.8.0