From 6267ca2039e6538e4f687071e71e2ad2dda11d09 Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期一, 13 十一月 2023 17:04:57 +0800 Subject: [PATCH] 系统启动时,保存systemDeviceId记录并上报 --- constvar/const.go | 7 + model/common/common.go | 6 + service/device.go | 22 +++ model/index.go | 1 main.go | 7 + model/reports_to_cloud.go | 221 ++++++++++++++++++++++++++++++++++++++++++++ crontask/cron_task.go | 30 +++++ 7 files changed, 288 insertions(+), 6 deletions(-) diff --git a/constvar/const.go b/constvar/const.go index e6f9b85..d426658 100644 --- a/constvar/const.go +++ b/constvar/const.go @@ -8,6 +8,7 @@ NsqTopicDeviceUpdate = "aps.%v.device.update" //璁惧淇℃伅鏇存敼 NsqTopicPullDataRequest = "aps.%v.pull.data.request" //鎷夊彇鏁版嵁璇锋眰 NsqTopicPullDataResponse = "aps.%v.pull.data.response" //鎷夊彇鏁版嵁鍝嶅簲 + NsqTopicApsClientReportData = "aps.%v.apsClient.report.data" //apsClient涓婃姤鏁版嵁 ) type PlcStartAddressType int @@ -127,3 +128,9 @@ SystemStatusValueNormal SystemStatusValue = "1" SystemStatusValueUnNormal SystemStatusValue = "2" ) + +type ReportType int //涓婃姤绫诲瀷 + +const ( + ReportTypeSystemDeviceID ReportType = 1 +) diff --git a/crontask/cron_task.go b/crontask/cron_task.go index c38ef9d..fa684d2 100644 --- a/crontask/cron_task.go +++ b/crontask/cron_task.go @@ -84,7 +84,7 @@ s.Every(60).Seconds().Do(SyncProductionProgress) //鍚屾鐢熶骇鏁版嵁 s.Every(30).Seconds().Do(SyncTaskStatus) //鍚屾浠诲姟鐘舵�� s.Every(10).Seconds().Do(CheckNsqConn) //鏌ヨnsq杩炴帴 - + s.Every(30).Seconds().Do(ReportData) //涓婃姤鏁版嵁 } s.Every(20).Seconds().Do(QueryClusterStatus) //鏌ヨ闆嗙兢鑺傜偣鏁伴噺 @@ -127,6 +127,7 @@ err = caller.Send(msg) if err != nil { logx.Errorf("sync task status send msg error:%v", err.Error()) + continue } syncOkIds = append(syncOkIds, record.ID) } @@ -138,6 +139,33 @@ } } +func ReportData() { + records, err := model.NewReportsToCloudSearch(nil).SetOrder("id desc").SetPage(1, 100).FindNotTotal() + if err != nil { + logx.Errorf("ReportData get records err:%v", err) + } + okIds := make([]uint, 0, len(records)) + for _, record := range records { + msg := &common.MsgReportData{ + ReportType: record.ReportType, + Content: record.Content, + } + caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicApsClientReportData, conf.Conf.NsqConf.NodeId), "") + err = caller.Send(msg) + if err != nil { + logx.Errorf("sync task status send msg error:%v", err.Error()) + continue + } + okIds = append(okIds, record.ID) + } + if len(okIds) > 0 { + err = model.NewReportsToCloudSearch(nil).SetIDs(okIds).Delete() + if err != nil { + logx.Errorf("ReportData delete report ok records error:%v", err) + } + } +} + func RestartTask(isMaster bool) error { if s != nil { s.Stop() diff --git a/main.go b/main.go index 4158c90..2803974 100644 --- a/main.go +++ b/main.go @@ -45,6 +45,7 @@ "device_plc", "system_status", "process_model_plc_address", + "reports_to_cloud", } agent := serf.InitAgent("apsClient", syncTables, sqlitex.GetDB()) @@ -57,12 +58,16 @@ } //浠庢枃浠堕噷璇诲彇褰撳墠鐢熶骇璁惧id鑾峰彇璁惧鍒楄〃绗竴涓� - err := service.InitCurrentDeviceID() + err := service.InitCurrentDeviceID(serf.Vasystem.ServerID) if err != nil { logx.Errorf("InitCurrentDeviceID error: %v, exit", err) return } + go func() { + service.ReportsSystemDeviceToCloud(serf.Vasystem.ServerID) + }() + // 鍒ゆ柇褰撳墠闆嗙兢鐘舵�� logx.Infof("current agent.ClusterStatus:%v", agent.ClusterStatus) log.Println("current agent.ClusterStatus:", agent.ClusterStatus) diff --git a/model/common/common.go b/model/common/common.go index 628054e..e47b1fd 100644 --- a/model/common/common.go +++ b/model/common/common.go @@ -168,3 +168,9 @@ StopBit int `gorm:"type:int(11)" json:"stopBit"` //鍋滄浣嶏紝method = modbusRTU 鐢� Parity constvar.Parity `gorm:"type:int(11)" json:"parity"` //鏍¢獙鏂瑰紡锛宮ethod = modbusRTU 鐢� } + +// MsgReportData 鏁版嵁涓婃姤 +type MsgReportData struct { + ReportType constvar.ReportType `json:"reportType"` //涓婃姤绫诲瀷 + Content string `json:"Content"` +} diff --git a/model/index.go b/model/index.go index f109db2..e372ba6 100644 --- a/model/index.go +++ b/model/index.go @@ -31,6 +31,7 @@ TaskStatusSync{}, SystemStatus{}, ProcessModelPlcAddress{}, + ReportsToCloud{}, ) } diff --git a/model/reports_to_cloud.go b/model/reports_to_cloud.go new file mode 100644 index 0000000..f4acf5b --- /dev/null +++ b/model/reports_to_cloud.go @@ -0,0 +1,221 @@ +package model + +import ( + "apsClient/constvar" + "apsClient/pkg/sqlitex" + "fmt" + "github.com/jinzhu/gorm" +) + +type ( + // ReportsToCloud 鏁版嵁涓婃姤 + ReportsToCloud struct { + gorm.Model + ReportType constvar.ReportType `json:"reportType"` //涓婃姤绫诲瀷 + Content string `json:"Content"` + } + + ReportsToCloudSearch struct { + ReportsToCloud + Order string + PageNum int + PageSize int + Orm *gorm.DB + IDs []uint + } +) + +func (slf *ReportsToCloud) TableName() string { + return "reports_to_cloud" +} + +func NewReportsToCloudSearch(db *gorm.DB) *ReportsToCloudSearch { + if db == nil { + db = sqlitex.GetDB() + } + return &ReportsToCloudSearch{Orm: db} +} + +func (slf *ReportsToCloudSearch) SetOrm(tx *gorm.DB) *ReportsToCloudSearch { + slf.Orm = tx + return slf +} + +func (slf *ReportsToCloudSearch) SetPage(page, size int) *ReportsToCloudSearch { + slf.PageNum, slf.PageSize = page, size + return slf +} + +func (slf *ReportsToCloudSearch) SetOrder(order string) *ReportsToCloudSearch { + slf.Order = order + return slf +} + +func (slf *ReportsToCloudSearch) SetID(id uint) *ReportsToCloudSearch { + slf.ID = id + return slf +} + +func (slf *ReportsToCloudSearch) SetIDs(ids []uint) *ReportsToCloudSearch { + slf.IDs = ids + return slf +} + +func (slf *ReportsToCloudSearch) build() *gorm.DB { + var db = slf.Orm.Table(slf.TableName()) + + if slf.ID != 0 { + db = db.Where("id = ?", slf.ID) + } + + if len(slf.IDs) != 0 { + db = db.Where("id in (?)", slf.IDs) + } + + if slf.Order != "" { + db = db.Order(slf.Order) + } + + return db +} + +// Create 鍗曟潯鎻掑叆 +func (slf *ReportsToCloudSearch) Create(record *ReportsToCloud) error { + var db = slf.build() + + if err := db.Create(record).Error; err != nil { + return fmt.Errorf("create err: %v, record: %+v", err, record) + } + + return nil +} + +func (slf *ReportsToCloudSearch) Save(record *ReportsToCloud) error { + var db = slf.build() + + if err := db.Updates(record).Error; err != nil { + return fmt.Errorf("create err: %v, record: %+v", err, record) + } + return nil +} + +func (slf *ReportsToCloudSearch) UpdateByMap(upMap map[string]interface{}) error { + var ( + db = slf.build() + ) + + if err := db.Updates(upMap).Error; err != nil { + return fmt.Errorf("update by map err: %v, upMap: %+v", err, upMap) + } + + return nil +} + +func (slf *ReportsToCloudSearch) UpdateByQuery(query string, args []interface{}, upMap map[string]interface{}) error { + var ( + db = slf.Orm.Table(slf.TableName()).Where(query, args...) + ) + + if err := db.Updates(upMap).Error; err != nil { + return fmt.Errorf("update by query err: %v, query: %s, args: %+v, upMap: %+v", err, query, args, upMap) + } + + return nil +} + +func (slf *ReportsToCloudSearch) Delete() error { + var db = slf.build() + + if err := db.Unscoped().Delete(&ReportsToCloud{}).Error; err != nil { + return err + } + + return nil +} + +func (slf *ReportsToCloudSearch) First() (*ReportsToCloud, error) { + var ( + record = new(ReportsToCloud) + db = slf.build() + ) + + if err := db.First(record).Error; err != nil { + return record, err + } + + return record, nil +} + +func (slf *ReportsToCloudSearch) Find() ([]*ReportsToCloud, int64, error) { + var ( + records = make([]*ReportsToCloud, 0) + total int64 + db = slf.build() + ) + + if err := db.Count(&total).Error; err != nil { + return records, total, fmt.Errorf("find count err: %v", err) + } + if slf.PageNum*slf.PageSize > 0 { + db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) + } + if err := db.Find(&records).Error; err != nil { + return records, total, fmt.Errorf("find records err: %v", err) + } + + return records, total, nil +} + +func (slf *ReportsToCloudSearch) FindNotTotal() ([]*ReportsToCloud, error) { + var ( + records = make([]*ReportsToCloud, 0) + db = slf.build() + ) + + if slf.PageNum*slf.PageSize > 0 { + db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) + } + if err := db.Find(&records).Error; err != nil { + return records, fmt.Errorf("find records err: %v", err) + } + + return records, nil +} + +// FindByQuery 鎸囧畾鏉′欢鏌ヨ. +func (slf *ReportsToCloudSearch) FindByQuery(query string, args []interface{}) ([]*ReportsToCloud, int64, error) { + var ( + records = make([]*ReportsToCloud, 0) + total int64 + db = slf.Orm.Table(slf.TableName()).Where(query, args...) + ) + + if err := db.Count(&total).Error; err != nil { + return records, total, fmt.Errorf("find by query count err: %v", err) + } + if slf.PageNum*slf.PageSize > 0 { + db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) + } + if err := db.Find(&records).Error; err != nil { + return records, total, fmt.Errorf("find by query records err: %v, query: %s, args: %+v", err, query, args) + } + + return records, total, nil +} + +// FindByQueryNotTotal 鎸囧畾鏉′欢鏌ヨ&涓嶆煡璇㈡�绘潯鏁�. +func (slf *ReportsToCloudSearch) FindByQueryNotTotal(query string, args []interface{}) ([]*ReportsToCloud, error) { + var ( + records = make([]*ReportsToCloud, 0) + db = slf.Orm.Table(slf.TableName()).Where(query, args...) + ) + + if slf.PageNum*slf.PageSize > 0 { + db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) + } + if err := db.Find(&records).Error; err != nil { + return records, fmt.Errorf("find by query records err: %v, query: %s, args: %+v", err, query, args) + } + + return records, nil +} diff --git a/service/device.go b/service/device.go index e0ff8ab..e9731cf 100644 --- a/service/device.go +++ b/service/device.go @@ -2,6 +2,7 @@ import ( "apsClient/conf" + "apsClient/constvar" "apsClient/model" "apsClient/model/response" "apsClient/pkg/logx" @@ -45,7 +46,7 @@ return deviceList, nil } -func InitCurrentDeviceID() (err error) { +func InitCurrentDeviceID(ServerID string) (err error) { currentDeviceID := ReadDeviceIDFromFile() if currentDeviceID != "" { conf.Conf.CurrentDeviceID = currentDeviceID @@ -55,10 +56,12 @@ if err != nil { return err } - if len(deviceList) == 0 { - conf.Conf.CurrentDeviceID = conf.Conf.System.DeviceId - } else { + if len(deviceList) >= 0 { conf.Conf.CurrentDeviceID = deviceList[0] + } else if ServerID != "" { + conf.Conf.CurrentDeviceID = ServerID + } else { + conf.Conf.CurrentDeviceID = conf.Conf.System.DeviceId } SetDeviceIDToFile(conf.Conf.CurrentDeviceID) return nil @@ -106,3 +109,14 @@ } return device, nil } + +// ReportsSystemDeviceToCloud 鍒涘缓鍚屾璁惧id璁板綍 +func ReportsSystemDeviceToCloud(systemDeviceID string) { + err := model.NewReportsToCloudSearch(nil).Create(&model.ReportsToCloud{ + ReportType: constvar.ReportTypeSystemDeviceID, + Content: systemDeviceID, + }) + if err != nil { + logx.Errorf("ReportsSystemDeviceToCloud create record error:%v", err) + } +} -- Gitblit v1.8.0