From bc7ee359dfe4e66a05c2cca9deb7a945534009f3 Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期五, 20 十月 2023 18:30:26 +0800
Subject: [PATCH] 删除不再用的topic,增加初始化的时候,设备拉取

---
 constvar/const.go      |    7 +--
 nsq/msg_handler.go     |   36 +++++++++++++++++
 model/common/common.go |    1 
 model/device.go        |   18 +++++++--
 crontask/cron_task.go  |    9 ++--
 nsq/consumer.go        |    2 -
 nsq/nsq.go             |    3 +
 7 files changed, 59 insertions(+), 17 deletions(-)

diff --git a/constvar/const.go b/constvar/const.go
index 5a294e8..ce5eac9 100644
--- a/constvar/const.go
+++ b/constvar/const.go
@@ -1,11 +1,8 @@
 package constvar
 
 const (
-	NsqTopicScheduleTask              = "aps.%v.scheduleTask" //鎺掔▼浠诲姟涓嬪彂
-	NsqTopicGetPlcAddress             = "aps.%v.getPlcAddress"
-	NsqTopicSendPlcAddress            = "aps.%v.sendPlcAddress"
-	NsqTopicProcessParamsRequest      = "aps.%v.processParams.request"
-	NsqTopicProcessParamsResponse     = "aps.%v.processParams.response"
+	NsqTopicScheduleTask              = "aps.%v.scheduleTask"            //鎺掔▼浠诲姟涓嬪彂
+	NsqTopicSendPlcAddress            = "aps.%v.sendPlcAddress"          //plc address鏇存柊
 	NsqTopicApsProcessParams          = "aps.%v.aps.processParams"       //鏈変簡鏂扮殑宸ヨ壓妯″瀷
 	NsqTopicTaskProcedureStatusUpdate = "aps.%v.task.procedure.status"   //宸ュ簭鐘舵�佹洿鏂�
 	NsqTopicSyncTaskProgress          = "aps.%v.task.procedure.progress" //宸ュ簭鐢熶骇杩涘害
diff --git a/crontask/cron_task.go b/crontask/cron_task.go
index cd4e9e6..230d38e 100644
--- a/crontask/cron_task.go
+++ b/crontask/cron_task.go
@@ -143,11 +143,12 @@
 	caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse)
 	err := caller.Send(msg)
 	if err != nil {
-		logx.Errorf("send pull data msg error:%v", err.Error())
+		logx.Errorf("send pull data msg error:%v, msg:%+v", err.Error(), msg)
 	}
-	caller = nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), "")
-	err = caller.Send(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId})
+	msg = &common.MsgPullDataRequest{DataType: common.PullDataTypeDevice}
+	caller = nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse)
+	err = caller.Send(msg)
 	if err != nil {
-		logx.Infof("get plc address err: %v", err.Error())
+		logx.Errorf("send pull data msg error:%v, msg:%+v", err.Error(), msg)
 	}
 }
diff --git a/model/common/common.go b/model/common/common.go
index 2bf3fcb..2f8c2f4 100644
--- a/model/common/common.go
+++ b/model/common/common.go
@@ -139,6 +139,7 @@
 
 const (
 	PullDataTypeProcessModel = "process_model"
+	PullDataTypeDevice       = "device"
 )
 
 // MsgPullDataRequest 鎷夊彇浜戠鏁版嵁
diff --git a/model/device.go b/model/device.go
index 46e2665..59c7859 100644
--- a/model/device.go
+++ b/model/device.go
@@ -19,10 +19,11 @@
 
 	DeviceSearch struct {
 		Device
-		Order    string
-		PageNum  int
-		PageSize int
-		Orm      *gorm.DB
+		Order     string
+		PageNum   int
+		PageSize  int
+		Orm       *gorm.DB
+		DeviceIDs []string
 	}
 )
 
@@ -64,6 +65,11 @@
 	return slf
 }
 
+func (slf *DeviceSearch) SetDeviceIds(deviceIds []string) *DeviceSearch {
+	slf.DeviceIDs = deviceIds
+	return slf
+}
+
 func (slf *DeviceSearch) build() *gorm.DB {
 	var db = slf.Orm.Table(slf.TableName())
 
@@ -75,6 +81,10 @@
 		db = db.Where("device_id = ?", slf.DeviceID)
 	}
 
+	if len(slf.DeviceIDs) != 0 {
+		db = db.Where("device_id in (?)", slf.DeviceIDs)
+	}
+
 	if slf.Order != "" {
 		db = db.Order(slf.Order)
 	}
diff --git a/nsq/consumer.go b/nsq/consumer.go
index d75a999..e8d5655 100644
--- a/nsq/consumer.go
+++ b/nsq/consumer.go
@@ -21,8 +21,6 @@
 		handler = new(ScheduleTask)
 	case fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId):
 		handler = &PlcAddress{Topic: topic}
-	case fmt.Sprintf(constvar.NsqTopicProcessParamsResponse, conf.Conf.NsqConf.NodeId):
-		handler = &ProcessParams{Topic: topic}
 	case fmt.Sprintf(constvar.NsqTopicApsProcessParams, conf.Conf.NsqConf.NodeId):
 		handler = &ProcessParamsSync{Topic: topic}
 	case fmt.Sprintf(constvar.NsqTopicDeviceUpdate, conf.Conf.NsqConf.NodeId):
diff --git a/nsq/msg_handler.go b/nsq/msg_handler.go
index 15cbf29..3e1f160 100644
--- a/nsq/msg_handler.go
+++ b/nsq/msg_handler.go
@@ -299,7 +299,8 @@
 	switch pullDataResponse.DataType {
 	case common.PullDataTypeProcessModel:
 		err = slf.DealProcessModelData(pullDataResponse.Data)
-
+	case common.PullDataTypeDevice:
+		err = slf.DealDeviceData(pullDataResponse.Data)
 	}
 	if err != nil {
 		logx.Infof("process pull data  err :%s", err)
@@ -346,3 +347,36 @@
 	}
 	return nil
 }
+
+func (slf *PullDataResponse) DealDeviceData(data interface{}) error {
+	var devices []*model.Device
+	err := mapstructure.Decode(data, &devices)
+	if err != nil {
+		return err
+	}
+	numbers := make([]string, 0, len(devices))
+	for _, item := range devices {
+		numbers = append(numbers, item.DeviceID)
+	}
+	existsDevices, err := model.NewDeviceSearch().SetDeviceIds(numbers).FindNotTotal()
+	if err != nil {
+		return err
+	}
+
+	existsDeviceMap := make(map[string]*model.Device, len(existsDevices))
+	for _, device := range existsDevices {
+		existsDeviceMap[device.DeviceID] = device
+	}
+
+	for _, device := range devices {
+		if oldDevice, exists := existsDeviceMap[device.DeviceID]; exists {
+			if oldDevice.ExtChannelAmount != device.ExtChannelAmount || //todo to be continued
+				oldDevice.Procedures != device.Procedures {
+				err = model.NewDeviceSearch().SetDeviceId(device.DeviceID).Save(device)
+			}
+		} else {
+			err = model.NewDeviceSearch().Create(device)
+		}
+	}
+	return err
+}
diff --git a/nsq/nsq.go b/nsq/nsq.go
index a64f1b0..c2eb366 100644
--- a/nsq/nsq.go
+++ b/nsq/nsq.go
@@ -34,7 +34,6 @@
 	var topics = []string{
 		constvar.NsqTopicScheduleTask,
 		constvar.NsqTopicSendPlcAddress,
-		constvar.NsqTopicProcessParamsResponse,
 		constvar.NsqTopicApsProcessParams,
 		constvar.NsqTopicDeviceUpdate,
 		constvar.NsqTopicPullDataResponse,
@@ -73,7 +72,9 @@
 		if consumer, ok := value.(*nsqclient.NsqConsumer); ok {
 			nsqclient.DestroyNsqConsumer(consumer)
 			logx.Infof("try stop consumer, topic : %v", key)
+			consumer = nil
 		}
+
 		return true
 	})
 }

--
Gitblit v1.8.0