constvar/const.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
crontask/cron_task.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
main.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
model/common/common.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
model/index.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
model/reports_to_cloud.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
service/device.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
constvar/const.go
@@ -8,6 +8,7 @@ NsqTopicDeviceUpdate = "aps.%v.device.update" //设备信息更改 NsqTopicPullDataRequest = "aps.%v.pull.data.request" //拉取数据请求 NsqTopicPullDataResponse = "aps.%v.pull.data.response" //拉取数据响应 NsqTopicApsClientReportData = "aps.%v.apsClient.report.data" //apsClient上报数据 ) type PlcStartAddressType int @@ -127,3 +128,9 @@ SystemStatusValueNormal SystemStatusValue = "1" SystemStatusValueUnNormal SystemStatusValue = "2" ) type ReportType int //上报类型 const ( ReportTypeSystemDeviceID ReportType = 1 ) crontask/cron_task.go
@@ -84,7 +84,7 @@ s.Every(60).Seconds().Do(SyncProductionProgress) //同步生产数据 s.Every(30).Seconds().Do(SyncTaskStatus) //同步任务状态 s.Every(10).Seconds().Do(CheckNsqConn) //查询nsq连接 s.Every(30).Seconds().Do(ReportData) //上报数据 } s.Every(20).Seconds().Do(QueryClusterStatus) //查询集群节点数量 @@ -127,6 +127,7 @@ err = caller.Send(msg) if err != nil { logx.Errorf("sync task status send msg error:%v", err.Error()) continue } syncOkIds = append(syncOkIds, record.ID) } @@ -138,6 +139,33 @@ } } func ReportData() { records, err := model.NewReportsToCloudSearch(nil).SetOrder("id desc").SetPage(1, 100).FindNotTotal() if err != nil { logx.Errorf("ReportData get records err:%v", err) } okIds := make([]uint, 0, len(records)) for _, record := range records { msg := &common.MsgReportData{ ReportType: record.ReportType, Content: record.Content, } caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicApsClientReportData, conf.Conf.NsqConf.NodeId), "") err = caller.Send(msg) if err != nil { logx.Errorf("sync task status send msg error:%v", err.Error()) continue } okIds = append(okIds, record.ID) } if len(okIds) > 0 { err = model.NewReportsToCloudSearch(nil).SetIDs(okIds).Delete() if err != nil { logx.Errorf("ReportData delete report ok records error:%v", err) } } } func RestartTask(isMaster bool) error { if s != nil { s.Stop() main.go
@@ -45,6 +45,7 @@ "device_plc", "system_status", "process_model_plc_address", "reports_to_cloud", } agent := serf.InitAgent("apsClient", syncTables, sqlitex.GetDB()) @@ -57,12 +58,16 @@ } //从文件里读取当前生产设备id获取设备列表第一个 err := service.InitCurrentDeviceID() err := service.InitCurrentDeviceID(serf.Vasystem.ServerID) if err != nil { logx.Errorf("InitCurrentDeviceID error: %v, exit", err) return } go func() { service.ReportsSystemDeviceToCloud(serf.Vasystem.ServerID) }() // 判断当前集群状态 logx.Infof("current agent.ClusterStatus:%v", agent.ClusterStatus) log.Println("current agent.ClusterStatus:", agent.ClusterStatus) model/common/common.go
@@ -168,3 +168,9 @@ StopBit int `gorm:"type:int(11)" json:"stopBit"` //停止位,method = modbusRTU 用 Parity constvar.Parity `gorm:"type:int(11)" json:"parity"` //校验方式,method = modbusRTU 用 } // MsgReportData 数据上报 type MsgReportData struct { ReportType constvar.ReportType `json:"reportType"` //上报类型 Content string `json:"Content"` } model/index.go
@@ -31,6 +31,7 @@ TaskStatusSync{}, SystemStatus{}, ProcessModelPlcAddress{}, ReportsToCloud{}, ) } model/reports_to_cloud.go
New file @@ -0,0 +1,221 @@ package model import ( "apsClient/constvar" "apsClient/pkg/sqlitex" "fmt" "github.com/jinzhu/gorm" ) type ( // ReportsToCloud 数据上报 ReportsToCloud struct { gorm.Model ReportType constvar.ReportType `json:"reportType"` //上报类型 Content string `json:"Content"` } ReportsToCloudSearch struct { ReportsToCloud Order string PageNum int PageSize int Orm *gorm.DB IDs []uint } ) func (slf *ReportsToCloud) TableName() string { return "reports_to_cloud" } func NewReportsToCloudSearch(db *gorm.DB) *ReportsToCloudSearch { if db == nil { db = sqlitex.GetDB() } return &ReportsToCloudSearch{Orm: db} } func (slf *ReportsToCloudSearch) SetOrm(tx *gorm.DB) *ReportsToCloudSearch { slf.Orm = tx return slf } func (slf *ReportsToCloudSearch) SetPage(page, size int) *ReportsToCloudSearch { slf.PageNum, slf.PageSize = page, size return slf } func (slf *ReportsToCloudSearch) SetOrder(order string) *ReportsToCloudSearch { slf.Order = order return slf } func (slf *ReportsToCloudSearch) SetID(id uint) *ReportsToCloudSearch { slf.ID = id return slf } func (slf *ReportsToCloudSearch) SetIDs(ids []uint) *ReportsToCloudSearch { slf.IDs = ids return slf } func (slf *ReportsToCloudSearch) build() *gorm.DB { var db = slf.Orm.Table(slf.TableName()) if slf.ID != 0 { db = db.Where("id = ?", slf.ID) } if len(slf.IDs) != 0 { db = db.Where("id in (?)", slf.IDs) } if slf.Order != "" { db = db.Order(slf.Order) } return db } // Create 单条插入 func (slf *ReportsToCloudSearch) Create(record *ReportsToCloud) error { var db = slf.build() if err := db.Create(record).Error; err != nil { return fmt.Errorf("create err: %v, record: %+v", err, record) } return nil } func (slf *ReportsToCloudSearch) Save(record *ReportsToCloud) error { var db = slf.build() if err := db.Updates(record).Error; err != nil { return fmt.Errorf("create err: %v, record: %+v", err, record) } return nil } func (slf *ReportsToCloudSearch) UpdateByMap(upMap map[string]interface{}) error { var ( db = slf.build() ) if err := db.Updates(upMap).Error; err != nil { return fmt.Errorf("update by map err: %v, upMap: %+v", err, upMap) } return nil } func (slf *ReportsToCloudSearch) UpdateByQuery(query string, args []interface{}, upMap map[string]interface{}) error { var ( db = slf.Orm.Table(slf.TableName()).Where(query, args...) ) if err := db.Updates(upMap).Error; err != nil { return fmt.Errorf("update by query err: %v, query: %s, args: %+v, upMap: %+v", err, query, args, upMap) } return nil } func (slf *ReportsToCloudSearch) Delete() error { var db = slf.build() if err := db.Unscoped().Delete(&ReportsToCloud{}).Error; err != nil { return err } return nil } func (slf *ReportsToCloudSearch) First() (*ReportsToCloud, error) { var ( record = new(ReportsToCloud) db = slf.build() ) if err := db.First(record).Error; err != nil { return record, err } return record, nil } func (slf *ReportsToCloudSearch) Find() ([]*ReportsToCloud, int64, error) { var ( records = make([]*ReportsToCloud, 0) total int64 db = slf.build() ) if err := db.Count(&total).Error; err != nil { return records, total, fmt.Errorf("find count err: %v", err) } if slf.PageNum*slf.PageSize > 0 { db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) } if err := db.Find(&records).Error; err != nil { return records, total, fmt.Errorf("find records err: %v", err) } return records, total, nil } func (slf *ReportsToCloudSearch) FindNotTotal() ([]*ReportsToCloud, error) { var ( records = make([]*ReportsToCloud, 0) db = slf.build() ) if slf.PageNum*slf.PageSize > 0 { db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) } if err := db.Find(&records).Error; err != nil { return records, fmt.Errorf("find records err: %v", err) } return records, nil } // FindByQuery 指定条件查询. func (slf *ReportsToCloudSearch) FindByQuery(query string, args []interface{}) ([]*ReportsToCloud, int64, error) { var ( records = make([]*ReportsToCloud, 0) total int64 db = slf.Orm.Table(slf.TableName()).Where(query, args...) ) if err := db.Count(&total).Error; err != nil { return records, total, fmt.Errorf("find by query count err: %v", err) } if slf.PageNum*slf.PageSize > 0 { db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) } if err := db.Find(&records).Error; err != nil { return records, total, fmt.Errorf("find by query records err: %v, query: %s, args: %+v", err, query, args) } return records, total, nil } // FindByQueryNotTotal 指定条件查询&不查询总条数. func (slf *ReportsToCloudSearch) FindByQueryNotTotal(query string, args []interface{}) ([]*ReportsToCloud, error) { var ( records = make([]*ReportsToCloud, 0) db = slf.Orm.Table(slf.TableName()).Where(query, args...) ) if slf.PageNum*slf.PageSize > 0 { db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize) } if err := db.Find(&records).Error; err != nil { return records, fmt.Errorf("find by query records err: %v, query: %s, args: %+v", err, query, args) } return records, nil } service/device.go
@@ -2,6 +2,7 @@ import ( "apsClient/conf" "apsClient/constvar" "apsClient/model" "apsClient/model/response" "apsClient/pkg/logx" @@ -45,7 +46,7 @@ return deviceList, nil } func InitCurrentDeviceID() (err error) { func InitCurrentDeviceID(ServerID string) (err error) { currentDeviceID := ReadDeviceIDFromFile() if currentDeviceID != "" { conf.Conf.CurrentDeviceID = currentDeviceID @@ -55,10 +56,12 @@ if err != nil { return err } if len(deviceList) == 0 { conf.Conf.CurrentDeviceID = conf.Conf.System.DeviceId } else { if len(deviceList) >= 0 { conf.Conf.CurrentDeviceID = deviceList[0] } else if ServerID != "" { conf.Conf.CurrentDeviceID = ServerID } else { conf.Conf.CurrentDeviceID = conf.Conf.System.DeviceId } SetDeviceIDToFile(conf.Conf.CurrentDeviceID) return nil @@ -106,3 +109,14 @@ } return device, nil } // ReportsSystemDeviceToCloud 创建同步设备id记录 func ReportsSystemDeviceToCloud(systemDeviceID string) { err := model.NewReportsToCloudSearch(nil).Create(&model.ReportsToCloud{ ReportType: constvar.ReportTypeSystemDeviceID, Content: systemDeviceID, }) if err != nil { logx.Errorf("ReportsSystemDeviceToCloud create record error:%v", err) } }