From bc7ee359dfe4e66a05c2cca9deb7a945534009f3 Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期五, 20 十月 2023 18:30:26 +0800
Subject: [PATCH] 删除不再用的topic,增加初始化的时候,设备拉取
---
constvar/const.go | 7 +--
nsq/msg_handler.go | 36 +++++++++++++++++
model/common/common.go | 1
model/device.go | 18 +++++++--
crontask/cron_task.go | 9 ++--
nsq/consumer.go | 2 -
nsq/nsq.go | 3 +
7 files changed, 59 insertions(+), 17 deletions(-)
diff --git a/constvar/const.go b/constvar/const.go
index 5a294e8..ce5eac9 100644
--- a/constvar/const.go
+++ b/constvar/const.go
@@ -1,11 +1,8 @@
package constvar
const (
- NsqTopicScheduleTask = "aps.%v.scheduleTask" //鎺掔▼浠诲姟涓嬪彂
- NsqTopicGetPlcAddress = "aps.%v.getPlcAddress"
- NsqTopicSendPlcAddress = "aps.%v.sendPlcAddress"
- NsqTopicProcessParamsRequest = "aps.%v.processParams.request"
- NsqTopicProcessParamsResponse = "aps.%v.processParams.response"
+ NsqTopicScheduleTask = "aps.%v.scheduleTask" //鎺掔▼浠诲姟涓嬪彂
+ NsqTopicSendPlcAddress = "aps.%v.sendPlcAddress" //plc address鏇存柊
NsqTopicApsProcessParams = "aps.%v.aps.processParams" //鏈変簡鏂扮殑宸ヨ壓妯″瀷
NsqTopicTaskProcedureStatusUpdate = "aps.%v.task.procedure.status" //宸ュ簭鐘舵�佹洿鏂�
NsqTopicSyncTaskProgress = "aps.%v.task.procedure.progress" //宸ュ簭鐢熶骇杩涘害
diff --git a/crontask/cron_task.go b/crontask/cron_task.go
index cd4e9e6..230d38e 100644
--- a/crontask/cron_task.go
+++ b/crontask/cron_task.go
@@ -143,11 +143,12 @@
caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse)
err := caller.Send(msg)
if err != nil {
- logx.Errorf("send pull data msg error:%v", err.Error())
+ logx.Errorf("send pull data msg error:%v, msg:%+v", err.Error(), msg)
}
- caller = nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicGetPlcAddress, conf.Conf.NsqConf.NodeId), "")
- err = caller.Send(common.RequestPlcAddress{DeviceId: conf.Conf.System.DeviceId})
+ msg = &common.MsgPullDataRequest{DataType: common.PullDataTypeDevice}
+ caller = nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicPullDataRequest, conf.Conf.NsqConf.NodeId), constvar.NsqTopicPullDataResponse)
+ err = caller.Send(msg)
if err != nil {
- logx.Infof("get plc address err: %v", err.Error())
+ logx.Errorf("send pull data msg error:%v, msg:%+v", err.Error(), msg)
}
}
diff --git a/model/common/common.go b/model/common/common.go
index 2bf3fcb..2f8c2f4 100644
--- a/model/common/common.go
+++ b/model/common/common.go
@@ -139,6 +139,7 @@
const (
PullDataTypeProcessModel = "process_model"
+ PullDataTypeDevice = "device"
)
// MsgPullDataRequest 鎷夊彇浜戠鏁版嵁
diff --git a/model/device.go b/model/device.go
index 46e2665..59c7859 100644
--- a/model/device.go
+++ b/model/device.go
@@ -19,10 +19,11 @@
DeviceSearch struct {
Device
- Order string
- PageNum int
- PageSize int
- Orm *gorm.DB
+ Order string
+ PageNum int
+ PageSize int
+ Orm *gorm.DB
+ DeviceIDs []string
}
)
@@ -64,6 +65,11 @@
return slf
}
+func (slf *DeviceSearch) SetDeviceIds(deviceIds []string) *DeviceSearch {
+ slf.DeviceIDs = deviceIds
+ return slf
+}
+
func (slf *DeviceSearch) build() *gorm.DB {
var db = slf.Orm.Table(slf.TableName())
@@ -75,6 +81,10 @@
db = db.Where("device_id = ?", slf.DeviceID)
}
+ if len(slf.DeviceIDs) != 0 {
+ db = db.Where("device_id in (?)", slf.DeviceIDs)
+ }
+
if slf.Order != "" {
db = db.Order(slf.Order)
}
diff --git a/nsq/consumer.go b/nsq/consumer.go
index d75a999..e8d5655 100644
--- a/nsq/consumer.go
+++ b/nsq/consumer.go
@@ -21,8 +21,6 @@
handler = new(ScheduleTask)
case fmt.Sprintf(constvar.NsqTopicSendPlcAddress, conf.Conf.NsqConf.NodeId):
handler = &PlcAddress{Topic: topic}
- case fmt.Sprintf(constvar.NsqTopicProcessParamsResponse, conf.Conf.NsqConf.NodeId):
- handler = &ProcessParams{Topic: topic}
case fmt.Sprintf(constvar.NsqTopicApsProcessParams, conf.Conf.NsqConf.NodeId):
handler = &ProcessParamsSync{Topic: topic}
case fmt.Sprintf(constvar.NsqTopicDeviceUpdate, conf.Conf.NsqConf.NodeId):
diff --git a/nsq/msg_handler.go b/nsq/msg_handler.go
index 15cbf29..3e1f160 100644
--- a/nsq/msg_handler.go
+++ b/nsq/msg_handler.go
@@ -299,7 +299,8 @@
switch pullDataResponse.DataType {
case common.PullDataTypeProcessModel:
err = slf.DealProcessModelData(pullDataResponse.Data)
-
+ case common.PullDataTypeDevice:
+ err = slf.DealDeviceData(pullDataResponse.Data)
}
if err != nil {
logx.Infof("process pull data err :%s", err)
@@ -346,3 +347,36 @@
}
return nil
}
+
+func (slf *PullDataResponse) DealDeviceData(data interface{}) error {
+ var devices []*model.Device
+ err := mapstructure.Decode(data, &devices)
+ if err != nil {
+ return err
+ }
+ numbers := make([]string, 0, len(devices))
+ for _, item := range devices {
+ numbers = append(numbers, item.DeviceID)
+ }
+ existsDevices, err := model.NewDeviceSearch().SetDeviceIds(numbers).FindNotTotal()
+ if err != nil {
+ return err
+ }
+
+ existsDeviceMap := make(map[string]*model.Device, len(existsDevices))
+ for _, device := range existsDevices {
+ existsDeviceMap[device.DeviceID] = device
+ }
+
+ for _, device := range devices {
+ if oldDevice, exists := existsDeviceMap[device.DeviceID]; exists {
+ if oldDevice.ExtChannelAmount != device.ExtChannelAmount || //todo to be continued
+ oldDevice.Procedures != device.Procedures {
+ err = model.NewDeviceSearch().SetDeviceId(device.DeviceID).Save(device)
+ }
+ } else {
+ err = model.NewDeviceSearch().Create(device)
+ }
+ }
+ return err
+}
diff --git a/nsq/nsq.go b/nsq/nsq.go
index a64f1b0..c2eb366 100644
--- a/nsq/nsq.go
+++ b/nsq/nsq.go
@@ -34,7 +34,6 @@
var topics = []string{
constvar.NsqTopicScheduleTask,
constvar.NsqTopicSendPlcAddress,
- constvar.NsqTopicProcessParamsResponse,
constvar.NsqTopicApsProcessParams,
constvar.NsqTopicDeviceUpdate,
constvar.NsqTopicPullDataResponse,
@@ -73,7 +72,9 @@
if consumer, ok := value.(*nsqclient.NsqConsumer); ok {
nsqclient.DestroyNsqConsumer(consumer)
logx.Infof("try stop consumer, topic : %v", key)
+ consumer = nil
}
+
return true
})
}
--
Gitblit v1.8.0