zhangqian
2023-11-13 6267ca2039e6538e4f687071e71e2ad2dda11d09
系统启动时,保存systemDeviceId记录并上报
1个文件已添加
6个文件已修改
294 ■■■■■ 已修改文件
constvar/const.go 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
crontask/cron_task.go 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
model/common/common.go 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
model/index.go 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
model/reports_to_cloud.go 221 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/device.go 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constvar/const.go
@@ -8,6 +8,7 @@
    NsqTopicDeviceUpdate              = "aps.%v.device.update"           //设备信息更改
    NsqTopicPullDataRequest           = "aps.%v.pull.data.request"       //拉取数据请求
    NsqTopicPullDataResponse          = "aps.%v.pull.data.response"      //拉取数据响应
    NsqTopicApsClientReportData       = "aps.%v.apsClient.report.data"   //apsClient上报数据
)
type PlcStartAddressType int
@@ -127,3 +128,9 @@
    SystemStatusValueNormal   SystemStatusValue = "1"
    SystemStatusValueUnNormal SystemStatusValue = "2"
)
type ReportType int //上报类型
const (
    ReportTypeSystemDeviceID ReportType = 1
)
crontask/cron_task.go
@@ -84,7 +84,7 @@
        s.Every(60).Seconds().Do(SyncProductionProgress) //同步生产数据
        s.Every(30).Seconds().Do(SyncTaskStatus)         //同步任务状态
        s.Every(10).Seconds().Do(CheckNsqConn)           //查询nsq连接
        s.Every(30).Seconds().Do(ReportData)             //上报数据
    }
    s.Every(20).Seconds().Do(QueryClusterStatus) //查询集群节点数量
@@ -127,6 +127,7 @@
        err = caller.Send(msg)
        if err != nil {
            logx.Errorf("sync task status send msg error:%v", err.Error())
            continue
        }
        syncOkIds = append(syncOkIds, record.ID)
    }
@@ -138,6 +139,33 @@
    }
}
func ReportData() {
    records, err := model.NewReportsToCloudSearch(nil).SetOrder("id desc").SetPage(1, 100).FindNotTotal()
    if err != nil {
        logx.Errorf("ReportData get records err:%v", err)
    }
    okIds := make([]uint, 0, len(records))
    for _, record := range records {
        msg := &common.MsgReportData{
            ReportType: record.ReportType,
            Content:    record.Content,
        }
        caller := nsq.NewCaller(fmt.Sprintf(constvar.NsqTopicApsClientReportData, conf.Conf.NsqConf.NodeId), "")
        err = caller.Send(msg)
        if err != nil {
            logx.Errorf("sync task status send msg error:%v", err.Error())
            continue
        }
        okIds = append(okIds, record.ID)
    }
    if len(okIds) > 0 {
        err = model.NewReportsToCloudSearch(nil).SetIDs(okIds).Delete()
        if err != nil {
            logx.Errorf("ReportData delete report ok records error:%v", err)
        }
    }
}
func RestartTask(isMaster bool) error {
    if s != nil {
        s.Stop()
main.go
@@ -45,6 +45,7 @@
        "device_plc",
        "system_status",
        "process_model_plc_address",
        "reports_to_cloud",
    }
    agent := serf.InitAgent("apsClient", syncTables, sqlitex.GetDB())
@@ -57,12 +58,16 @@
    }
    //从文件里读取当前生产设备id获取设备列表第一个
    err := service.InitCurrentDeviceID()
    err := service.InitCurrentDeviceID(serf.Vasystem.ServerID)
    if err != nil {
        logx.Errorf("InitCurrentDeviceID error: %v, exit", err)
        return
    }
    go func() {
        service.ReportsSystemDeviceToCloud(serf.Vasystem.ServerID)
    }()
    // 判断当前集群状态
    logx.Infof("current agent.ClusterStatus:%v", agent.ClusterStatus)
    log.Println("current agent.ClusterStatus:", agent.ClusterStatus)
model/common/common.go
@@ -168,3 +168,9 @@
    StopBit    int             `gorm:"type:int(11)"  json:"stopBit"` //停止位,method = modbusRTU 用
    Parity     constvar.Parity `gorm:"type:int(11)"  json:"parity"`  //校验方式,method = modbusRTU 用
}
// MsgReportData 数据上报
type MsgReportData struct {
    ReportType constvar.ReportType `json:"reportType"` //上报类型
    Content    string              `json:"Content"`
}
model/index.go
@@ -31,6 +31,7 @@
        TaskStatusSync{},
        SystemStatus{},
        ProcessModelPlcAddress{},
        ReportsToCloud{},
    )
}
model/reports_to_cloud.go
New file
@@ -0,0 +1,221 @@
package model
import (
    "apsClient/constvar"
    "apsClient/pkg/sqlitex"
    "fmt"
    "github.com/jinzhu/gorm"
)
type (
    // ReportsToCloud 数据上报
    ReportsToCloud struct {
        gorm.Model
        ReportType constvar.ReportType `json:"reportType"` //上报类型
        Content    string              `json:"Content"`
    }
    ReportsToCloudSearch struct {
        ReportsToCloud
        Order    string
        PageNum  int
        PageSize int
        Orm      *gorm.DB
        IDs      []uint
    }
)
func (slf *ReportsToCloud) TableName() string {
    return "reports_to_cloud"
}
func NewReportsToCloudSearch(db *gorm.DB) *ReportsToCloudSearch {
    if db == nil {
        db = sqlitex.GetDB()
    }
    return &ReportsToCloudSearch{Orm: db}
}
func (slf *ReportsToCloudSearch) SetOrm(tx *gorm.DB) *ReportsToCloudSearch {
    slf.Orm = tx
    return slf
}
func (slf *ReportsToCloudSearch) SetPage(page, size int) *ReportsToCloudSearch {
    slf.PageNum, slf.PageSize = page, size
    return slf
}
func (slf *ReportsToCloudSearch) SetOrder(order string) *ReportsToCloudSearch {
    slf.Order = order
    return slf
}
func (slf *ReportsToCloudSearch) SetID(id uint) *ReportsToCloudSearch {
    slf.ID = id
    return slf
}
func (slf *ReportsToCloudSearch) SetIDs(ids []uint) *ReportsToCloudSearch {
    slf.IDs = ids
    return slf
}
func (slf *ReportsToCloudSearch) build() *gorm.DB {
    var db = slf.Orm.Table(slf.TableName())
    if slf.ID != 0 {
        db = db.Where("id = ?", slf.ID)
    }
    if len(slf.IDs) != 0 {
        db = db.Where("id in (?)", slf.IDs)
    }
    if slf.Order != "" {
        db = db.Order(slf.Order)
    }
    return db
}
// Create 单条插入
func (slf *ReportsToCloudSearch) Create(record *ReportsToCloud) error {
    var db = slf.build()
    if err := db.Create(record).Error; err != nil {
        return fmt.Errorf("create err: %v, record: %+v", err, record)
    }
    return nil
}
func (slf *ReportsToCloudSearch) Save(record *ReportsToCloud) error {
    var db = slf.build()
    if err := db.Updates(record).Error; err != nil {
        return fmt.Errorf("create err: %v, record: %+v", err, record)
    }
    return nil
}
func (slf *ReportsToCloudSearch) UpdateByMap(upMap map[string]interface{}) error {
    var (
        db = slf.build()
    )
    if err := db.Updates(upMap).Error; err != nil {
        return fmt.Errorf("update by map err: %v, upMap: %+v", err, upMap)
    }
    return nil
}
func (slf *ReportsToCloudSearch) UpdateByQuery(query string, args []interface{}, upMap map[string]interface{}) error {
    var (
        db = slf.Orm.Table(slf.TableName()).Where(query, args...)
    )
    if err := db.Updates(upMap).Error; err != nil {
        return fmt.Errorf("update by query err: %v, query: %s, args: %+v, upMap: %+v", err, query, args, upMap)
    }
    return nil
}
func (slf *ReportsToCloudSearch) Delete() error {
    var db = slf.build()
    if err := db.Unscoped().Delete(&ReportsToCloud{}).Error; err != nil {
        return err
    }
    return nil
}
func (slf *ReportsToCloudSearch) First() (*ReportsToCloud, error) {
    var (
        record = new(ReportsToCloud)
        db     = slf.build()
    )
    if err := db.First(record).Error; err != nil {
        return record, err
    }
    return record, nil
}
func (slf *ReportsToCloudSearch) Find() ([]*ReportsToCloud, int64, error) {
    var (
        records = make([]*ReportsToCloud, 0)
        total   int64
        db      = slf.build()
    )
    if err := db.Count(&total).Error; err != nil {
        return records, total, fmt.Errorf("find count err: %v", err)
    }
    if slf.PageNum*slf.PageSize > 0 {
        db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize)
    }
    if err := db.Find(&records).Error; err != nil {
        return records, total, fmt.Errorf("find records err: %v", err)
    }
    return records, total, nil
}
func (slf *ReportsToCloudSearch) FindNotTotal() ([]*ReportsToCloud, error) {
    var (
        records = make([]*ReportsToCloud, 0)
        db      = slf.build()
    )
    if slf.PageNum*slf.PageSize > 0 {
        db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize)
    }
    if err := db.Find(&records).Error; err != nil {
        return records, fmt.Errorf("find records err: %v", err)
    }
    return records, nil
}
// FindByQuery 指定条件查询.
func (slf *ReportsToCloudSearch) FindByQuery(query string, args []interface{}) ([]*ReportsToCloud, int64, error) {
    var (
        records = make([]*ReportsToCloud, 0)
        total   int64
        db      = slf.Orm.Table(slf.TableName()).Where(query, args...)
    )
    if err := db.Count(&total).Error; err != nil {
        return records, total, fmt.Errorf("find by query count err: %v", err)
    }
    if slf.PageNum*slf.PageSize > 0 {
        db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize)
    }
    if err := db.Find(&records).Error; err != nil {
        return records, total, fmt.Errorf("find by query records err: %v, query: %s, args: %+v", err, query, args)
    }
    return records, total, nil
}
// FindByQueryNotTotal 指定条件查询&不查询总条数.
func (slf *ReportsToCloudSearch) FindByQueryNotTotal(query string, args []interface{}) ([]*ReportsToCloud, error) {
    var (
        records = make([]*ReportsToCloud, 0)
        db      = slf.Orm.Table(slf.TableName()).Where(query, args...)
    )
    if slf.PageNum*slf.PageSize > 0 {
        db = db.Offset((slf.PageNum - 1) * slf.PageSize).Limit(slf.PageSize)
    }
    if err := db.Find(&records).Error; err != nil {
        return records, fmt.Errorf("find by query records err: %v, query: %s, args: %+v", err, query, args)
    }
    return records, nil
}
service/device.go
@@ -2,6 +2,7 @@
import (
    "apsClient/conf"
    "apsClient/constvar"
    "apsClient/model"
    "apsClient/model/response"
    "apsClient/pkg/logx"
@@ -45,7 +46,7 @@
    return deviceList, nil
}
func InitCurrentDeviceID() (err error) {
func InitCurrentDeviceID(ServerID string) (err error) {
    currentDeviceID := ReadDeviceIDFromFile()
    if currentDeviceID != "" {
        conf.Conf.CurrentDeviceID = currentDeviceID
@@ -55,10 +56,12 @@
    if err != nil {
        return err
    }
    if len(deviceList) == 0 {
        conf.Conf.CurrentDeviceID = conf.Conf.System.DeviceId
    } else {
    if len(deviceList) >= 0 {
        conf.Conf.CurrentDeviceID = deviceList[0]
    } else if ServerID != "" {
        conf.Conf.CurrentDeviceID = ServerID
    } else {
        conf.Conf.CurrentDeviceID = conf.Conf.System.DeviceId
    }
    SetDeviceIDToFile(conf.Conf.CurrentDeviceID)
    return nil
@@ -106,3 +109,14 @@
    }
    return device, nil
}
// ReportsSystemDeviceToCloud 创建同步设备id记录
func ReportsSystemDeviceToCloud(systemDeviceID string) {
    err := model.NewReportsToCloudSearch(nil).Create(&model.ReportsToCloud{
        ReportType: constvar.ReportTypeSystemDeviceID,
        Content:    systemDeviceID,
    })
    if err != nil {
        logx.Errorf("ReportsSystemDeviceToCloud create record error:%v", err)
    }
}