From f838162ed0ee7f2832924c2399eddd461760135a Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期五, 13 十月 2023 21:47:54 +0800
Subject: [PATCH] plc地址和工艺参数不再单独拉取,只serf主启动时拉取

---
 constvar/const.go      |    2 
 service/task.go        |   26 ------
 nsq/msg_handler.go     |   63 +++++++++++++++
 model/common/common.go |   16 ++++
 main.go                |   16 ++-
 model/process_model.go |   10 ++
 crontask/cron_task.go  |   33 +++++++
 nsq/consumer.go        |    2 
 api/v1/task.go         |   41 ----------
 nsq/nsq.go             |   11 --
 10 files changed, 135 insertions(+), 85 deletions(-)

diff --git a/api/v1/task.go b/api/v1/task.go
index ce5b49d..96a0b1a 100644
--- a/api/v1/task.go
+++ b/api/v1/task.go
@@ -4,18 +4,14 @@
 	"apsClient/conf"
 	"apsClient/constvar"
 	"apsClient/model"
-	"apsClient/model/common"
 	"apsClient/model/request"
 	"apsClient/model/response"
-	"apsClient/nsq"
 	"apsClient/pkg/contextx"
 	"apsClient/pkg/ecode"
 	"apsClient/pkg/logx"
-	"apsClient/pkg/safe"
 	"apsClient/service"
 	"apsClient/service/plc_address"
 	"errors"
-	"fmt"
 	"github.com/gin-gonic/gin"
 	"github.com/jinzhu/gorm"
 	"github.com/spf13/cast"
@@ -179,15 +175,6 @@
 		})
 	}
 
-	safe.Go(func() {
-		caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId))
-		var addressResult common.ResponsePlcAddress
-		err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*3)
-		if err != nil {
-			logx.Infof("get plc address err: %v", err.Error())
-		}
-	})
-
 	resp := response.ProcessParamsResponse{
 		Number: processModel.Number,
 		Params: processParamsArr,
@@ -231,20 +218,6 @@
 		logx.Errorf("UpdateProcedureStatus err: %v", err.Error())
 		ctx.Fail(ecode.UnknownErr)
 		return
-	}
-
-	msg := &common.MsgTaskStatusUpdate{
-		WorkOrderId:  procedure.WorkOrderID,
-		ProcedureID:  procedure.ProceduresInfo.ProcedureID,
-		DeviceId:     procedure.ProceduresInfo.DeviceID,
-		IsProcessing: false,
-		IsFinish:     true,
-	}
-
-	caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicTaskProcedureStatusUpdate, conf.Conf.NsqConf.NodeId), "")
-	err = caller.Send(msg)
-	if err != nil {
-		logx.Errorf("send task status update msg error:%v", err.Error())
 	}
 
 	service.TaskFlagUnset(procedure.Channel)
@@ -331,20 +304,6 @@
 	if err != nil {
 		ctx.FailWithMsg(ecode.NeedConfirmedErr, "PLC璇锋眰澶辫触锛岃妫�鏌LC閰嶇疆")
 		return
-	}
-
-	msg := &common.MsgTaskStatusUpdate{
-		WorkOrderId:  procedure.WorkOrderID,
-		ProcedureID:  procedure.ProceduresInfo.ProcedureID,
-		DeviceId:     procedure.ProceduresInfo.DeviceID,
-		IsProcessing: true,
-		IsFinish:     false,
-	}
-
-	caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicTaskProcedureStatusUpdate, conf.Conf.NsqConf.NodeId), "")
-	err = caller.Send(msg)
-	if err != nil {
-		logx.Errorf("send task status update msg error:%v", err.Error())
 	}
 	service.TaskFlagSet(procedure.Channel)
 	ctx.Ok()
diff --git a/constvar/const.go b/constvar/const.go
index 6bea5e5..5a294e8 100644
--- a/constvar/const.go
+++ b/constvar/const.go
@@ -10,6 +10,8 @@
 	NsqTopicTaskProcedureStatusUpdate = "aps.%v.task.procedure.status"   //宸ュ簭鐘舵�佹洿鏂�
 	NsqTopicSyncTaskProgress          = "aps.%v.task.procedure.progress" //宸ュ簭鐢熶骇杩涘害
 	NsqTopicDeviceUpdate              = "aps.%v.device.update"           //璁惧淇℃伅鏇存敼
+	NsqTopicPullDataRequest           = "aps.%v.pull.data.request"       //鎷夊彇鏁版嵁璇锋眰
+	NsqTopicPullDataResponse          = "aps.%v.pull.data.response"      //鎷夊彇鏁版嵁鍝嶅簲
 )
 
 type PlcStartAddressType int
diff --git a/crontask/cron_task.go b/crontask/cron_task.go
index 9454dbe..e462c68 100644
--- a/crontask/cron_task.go
+++ b/crontask/cron_task.go
@@ -3,6 +3,7 @@
 import (
 	"apsClient/conf"
 	"apsClient/constvar"
+	"apsClient/model/common"
 	"apsClient/nsq"
 	"apsClient/pkg/ecode"
 	"apsClient/pkg/logx"
@@ -13,8 +14,9 @@
 	"time"
 )
 
-func InitTask() error {
+var s *gocron.Scheduler
 
+func StartTask() error {
 	finishNumberTimeInterval := conf.Conf.PLC.FinishNumberTimeInterval
 	totalNumberTimeInterval := conf.Conf.PLC.TotalNumberTimeInterval
 	if finishNumberTimeInterval == 0 {
@@ -23,7 +25,7 @@
 	if totalNumberTimeInterval == 0 {
 		totalNumberTimeInterval = 60
 	}
-	s := gocron.NewScheduler(time.UTC)
+	s = gocron.NewScheduler(time.UTC)
 	_, err := s.Every(finishNumberTimeInterval).Seconds().Do(func() {
 		plcConfig, code := service.NewDevicePlcService().GetDevicePlc()
 		if code != ecode.OK {
@@ -73,7 +75,8 @@
 
 	})
 
-	s.Every(180).Seconds().Do(SyncProductionProgress) //鍚屾鐢熶骇鏁版嵁
+	s.Every(60).Seconds().Do(SyncProductionProgress) //鍚屾鐢熶骇鏁版嵁
+	s.Every(60).Seconds().Do(SyncTaskStatus)         //鍚屾浠诲姟鐘舵��
 	s.StartAsync()
 	return nil
 }
@@ -104,5 +107,29 @@
 			logx.Errorf("SyncProductionProgress error:%v", err.Error())
 		}
 	}
+}
 
+func SyncTaskStatus() {
+	//todo
+}
+
+func StopTask() {
+	if s != nil {
+		s.Stop()
+	}
+}
+
+// Once 涓�娆℃�т换鍔�
+func Once() {
+	msg := &common.MsgPullDataRequest{DataType: common.PullDataTypeProcessModel}
+	caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse)
+	err := caller.Send(msg)
+	if err != nil {
+		logx.Errorf("send pull data msg error:%v", err.Error())
+	}
+	caller = nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), "")
+	err = caller.Send(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId})
+	if err != nil {
+		logx.Infof("get plc address err: %v", err.Error())
+	}
 }
diff --git a/main.go b/main.go
index 569c9e7..bae163b 100644
--- a/main.go
+++ b/main.go
@@ -25,11 +25,6 @@
 		return
 	}
 
-	if err := crontask.InitTask(); err != nil {
-		logx.Errorf("crontab task Init err:%v", err)
-		return
-	}
-
 	//鍔犺浇plc鍐欏叆鍦板潃
 	plc_address.LoadAddressFromFile()
 
@@ -54,6 +49,11 @@
 			logx.Errorf("nsq Init err:%v", err)
 			return
 		}
+		crontask.Once()
+		if err := crontask.StartTask(); err != nil {
+			logx.Errorf("crontab task Init err:%v", err)
+			return
+		}
 	}
 
 	logx.Infof("apsClient start serve...")
@@ -73,8 +73,14 @@
 			logx.Errorf("nsq Init err:%v", err)
 			return
 		}
+		crontask.Once()
+		if err := crontask.StartTask(); err != nil {
+			logx.Errorf("crontab task Init err:%v", err)
+			return
+		}
 	case serf.EventJoinCluster, serf.EventMaster2Slave:
 		nsq.Stop()
+		crontask.StopTask()
 	}
 
 	logx.Infof("serf cluster event: %v", stat)
diff --git a/model/common/common.go b/model/common/common.go
index ead7e90..9df61af 100644
--- a/model/common/common.go
+++ b/model/common/common.go
@@ -137,3 +137,19 @@
 	ProcedureName string `gorm:"type:varchar(191);comment:宸ュ簭鍚嶇О" json:"procedureName"`
 	DeviceID      string `gorm:"index;type:varchar(191);not null;comment:璁惧ID" json:"deviceId"`
 }
+
+type PullDataType string
+
+const (
+	PullDataTypeProcessModel = "process_model"
+)
+
+// MsgPullDataRequest 鎷夊彇浜戠鏁版嵁
+type MsgPullDataRequest struct {
+	DataType PullDataType `json:"dataType"` //瑕佹媺鍙栫殑鏁版嵁绫诲瀷
+}
+
+type MsgPullDataResponse struct {
+	DataType PullDataType `json:"dataType"` //瑕佹媺鍙栫殑鏁版嵁绫诲瀷
+	Data     interface{}  //杩斿洖鐨勬暟鎹�
+}
diff --git a/model/process_model.go b/model/process_model.go
index bedda26..eeffa77 100644
--- a/model/process_model.go
+++ b/model/process_model.go
@@ -32,6 +32,7 @@
 		Orm           *gorm.DB
 		Procedures    []string
 		CurrentNumber string
+		Numbers       []string
 	}
 )
 
@@ -74,6 +75,11 @@
 
 func (slf *ProcessModelSearch) SetNumber(number string) *ProcessModelSearch {
 	slf.Number = number
+	return slf
+}
+
+func (slf *ProcessModelSearch) SetNumbers(numbers []string) *ProcessModelSearch {
+	slf.Numbers = numbers
 	return slf
 }
 
@@ -137,6 +143,10 @@
 		db = db.Where("number != ?", slf.CurrentNumber)
 	}
 
+	if len(slf.Numbers) != 0 {
+		db = db.Where("number in ?", slf.Numbers)
+	}
+
 	return db
 }
 
diff --git a/nsq/consumer.go b/nsq/consumer.go
index 879f641..b4c3ba1 100644
--- a/nsq/consumer.go
+++ b/nsq/consumer.go
@@ -27,6 +27,8 @@
 		handler = &ProcessParamsSync{Topic: topic}
 	case fmt.Sprintf(constvar.NsqTopicDeviceUpdate, conf.Conf.NsqConf.NodeId):
 		handler = &DeviceUpdate{Topic: topic}
+	case fmt.Sprintf(constvar.NsqTopicPullDataResponse, conf.Conf.NsqConf.NodeId):
+		handler = &PullDataResponse{Topic: topic}
 	}
 	c.AddHandler(handler.HandleMessage)
 
diff --git a/nsq/msg_handler.go b/nsq/msg_handler.go
index a01d9ea..1db651e 100644
--- a/nsq/msg_handler.go
+++ b/nsq/msg_handler.go
@@ -275,3 +275,66 @@
 
 	return nil
 }
+
+type PullDataResponse struct {
+	Topic string
+}
+
+func (slf *PullDataResponse) HandleMessage(data []byte) (err error) {
+	logx.Infof("get a pull data response message :%s", data)
+	var pullDataResponse common.MsgPullDataResponse
+	err = json.Unmarshal(data, &pullDataResponse)
+	if err != nil {
+		logx.Infof("unmarshal msg err :%s", err)
+		return err
+	}
+	switch pullDataResponse.DataType {
+	case common.PullDataTypeProcessModel:
+		err = slf.DealProcessModelData(pullDataResponse.Data)
+
+	}
+	if err != nil {
+		logx.Infof("process pull data  err :%s", err)
+		return err
+	}
+	return nil
+}
+
+func (slf *PullDataResponse) DealProcessModelData(data interface{}) error {
+	var processModels []*model.ProcessModel
+	err := mapstructure.Decode(data, &processModels)
+	if err != nil {
+		return err
+	}
+	numbers := make([]string, 0, len(processModels))
+	for _, processModel := range processModels {
+		numbers = append(numbers, processModel.Number)
+	}
+	existsProcessModels, err := model.NewProcessModelSearch().SetIsNew(true).SetNumbers(numbers).FindNotTotal()
+	if err != nil {
+		return err
+	}
+
+	existsProcessModelsMap := make(map[string]struct{}, len(existsProcessModels))
+	for _, processModel := range existsProcessModels {
+		existsProcessModelsMap[processModel.Number] = struct{}{}
+	}
+
+	for _, processModel := range processModels {
+		if _, exists := existsProcessModelsMap[processModel.Number]; exists {
+			continue
+		}
+		err = model.WithTransaction(func(db *gorm.DB) error {
+			err = model.NewProcessModelSearch().SetOrm(db).SetProcedure(processModel.Procedure).SetProduct(processModel.Product).SetIsNew(true).UpdateByMap(map[string]interface{}{"is_new": 0})
+			if err != nil {
+				return err
+			}
+			processModel.IsNew = true
+			return model.NewProcessModelSearch().SetOrm(db).Create(processModel)
+		})
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
diff --git a/nsq/nsq.go b/nsq/nsq.go
index 064efa7..4fb71b6 100644
--- a/nsq/nsq.go
+++ b/nsq/nsq.go
@@ -3,7 +3,6 @@
 import (
 	"apsClient/conf"
 	"apsClient/constvar"
-	"apsClient/model/common"
 	"apsClient/pkg/logx"
 	"apsClient/pkg/safe"
 	"basic.com/aps/nsqclient.git"
@@ -11,7 +10,6 @@
 	"errors"
 	"fmt"
 	"sync"
-	"time"
 )
 
 type consumerManager struct {
@@ -32,14 +30,6 @@
 	if err := initProducer(); err != nil {
 		return err
 	}
-	safe.Go(func() {
-		caller := NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId))
-		var addressResult common.ResponsePlcAddress
-		err := caller.Call(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId}, &addressResult, time.Second*3)
-		if err != nil {
-			logx.Infof("get plc address err: %v", err.Error())
-		}
-	})
 
 	var topics = []string{
 		constvar.NsqTopicScheduleTask,
@@ -47,6 +37,7 @@
 		constvar.NsqTopicProcessParamsResponse,
 		constvar.NsqTopicApsProcessParams,
 		constvar.NsqTopicDeviceUpdate,
+		constvar.NsqTopicPullDataResponse,
 	}
 	for _, t := range topics {
 		topic := fmt.Sprintf(t, conf.Conf.NsqConf.NodeId)
diff --git a/service/task.go b/service/task.go
index ed4c737..468509e 100644
--- a/service/task.go
+++ b/service/task.go
@@ -6,10 +6,8 @@
 	"apsClient/model"
 	"apsClient/model/common"
 	"apsClient/model/response"
-	"apsClient/nsq"
 	"apsClient/pkg/ecode"
 	"apsClient/pkg/logx"
-	"apsClient/pkg/structx"
 	"fmt"
 	"github.com/jinzhu/gorm"
 	"time"
@@ -176,30 +174,6 @@
 	}
 	if err == nil {
 		return data, nil
-	}
-
-	if err == gorm.ErrRecordNotFound { //濡傛灉鏁版嵁搴撴病鏈変粠浜戠鑾峰彇
-		caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicProcessParamsRequest, conf.Conf.NsqConf.NodeId), fmt.Sprintf(constvar.NsqTopicProcessParamsResponse, conf.Conf.NsqConf.NodeId))
-		var result common.ResponseProcessParams
-		err = caller.Call(common.RequestProcessParams{
-			WorkOrder: procedure.WorkOrderID,
-			OrderId:   procedure.OrderID,
-			Product:   order.ProductName,
-			Procedure: procedure.ProceduresInfo.ProcedureName,
-			Device:    procedure.ProceduresInfo.DeviceName,
-			DeviceId:  conf.Conf.System.DeviceId,
-		}, &result, time.Second*3)
-		if err != nil {
-			logx.Errorf("TaskStart GetProcessModel error:%v", err.Error())
-			return
-		}
-		if result.ParamsMap == nil {
-			logx.Errorf("TaskStart GetProcessModel response miss process params:%v", result)
-			return
-		}
-		processModel = new(model.ProcessModel)
-		err = structx.AssignTo(result, &processModel)
-		return processModel, err
 	}
 	return
 }

--
Gitblit v1.8.0