zhangqian
2023-11-02 7c1dd2e19119edd1ab147a699dfdcd11447fca14
serf集群master定时查询nsq连接状态并保存在数据库,增加网络检查器
1个文件已添加
8个文件已修改
266 ■■■■■ 已修改文件
constvar/const.go 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
crontask/cron_task.go 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
model/index.go 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
model/system_status.go 168 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsq/nsq.go 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/problem/check.go 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constvar/const.go
@@ -113,3 +113,16 @@
    ProblemCodePlcProcessModelAddressList ProblemCode = "plc_process_model_address_list" //plc地址表缺失
    ProblemCodePlcConnect                 ProblemCode = "plc_connect"                    //plc连接失败
)
type SystemStatusKey string
const (
    SystemStatusKeyNsq SystemStatusKey = "nsq"
)
type SystemStatusValue string
const (
    SystemStatusValueNormal   SystemStatusValue = "1"
    SystemStatusValueUnNormal SystemStatusValue = "2"
)
crontask/cron_task.go
@@ -3,6 +3,7 @@
import (
    "apsClient/conf"
    "apsClient/constvar"
    "apsClient/model"
    "apsClient/model/common"
    "apsClient/nsq"
    "apsClient/pkg/ecode"
@@ -80,9 +81,11 @@
    if isMaster {
        s.Every(60).Seconds().Do(SyncProductionProgress) //同步生产数据
        s.Every(30).Seconds().Do(SyncTaskStatus)         //同步任务状态
        s.Every(10).Seconds().Do(CheckNsqConn)           //查询nsq连接
    }
    s.Every(10).Seconds().Do(QueryClusterStatus) //查询集群节点数量
    s.Every(20).Seconds().Do(QueryClusterStatus) //查询集群节点数量
    s.Every(30).Seconds().Do(ProblemCheck)       //问题诊断
    s.StartAsync()
@@ -168,3 +171,25 @@
func ProblemCheck() {
    problem.Check()
}
func CheckNsqConn() {
    var err error
    var status constvar.SystemStatusValue
    if nsq.Ping() {
        status = constvar.SystemStatusValueNormal
    } else {
        status = constvar.SystemStatusValueUnNormal
    }
    old, err := model.NewSystemStatusSearch().SetKey(constvar.SystemStatusKeyNsq).First()
    if err != nil {
        logx.Errorf("get nsq status err:%v", err)
        return
    }
    if old.Value == status {
        return
    }
    err = model.NewSystemStatusSearch().SetKey(constvar.SystemStatusKeyNsq).Updates(map[string]interface{}{"value": status})
    if err != nil {
        logx.Errorf("update nsq status err:%v", err)
    }
}
go.mod
@@ -21,6 +21,7 @@
    github.com/mojocn/base64Captcha v1.3.1
    github.com/nsqio/go-nsq v1.1.0
    github.com/robfig/cron/v3 v3.0.1
    github.com/shirou/gopsutil v3.21.11+incompatible
    github.com/shopspring/decimal v1.3.1
    github.com/songzhibin97/gkit v1.2.10
    github.com/spf13/cast v1.5.1
@@ -49,6 +50,7 @@
    github.com/fsnotify/fsnotify v1.6.0 // indirect
    github.com/gabriel-vasile/mimetype v1.4.2 // indirect
    github.com/gin-contrib/sse v0.1.0 // indirect
    github.com/go-ole/go-ole v1.2.6 // indirect
    github.com/go-openapi/jsonpointer v0.19.6 // indirect
    github.com/go-openapi/jsonreference v0.20.1 // indirect
    github.com/go-openapi/spec v0.20.4 // indirect
@@ -92,6 +94,7 @@
    github.com/subosito/gotenv v1.4.2 // indirect
    github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
    github.com/ugorji/go/codec v1.2.11 // indirect
    github.com/yusufpapurcu/wmi v1.2.2 // indirect
    go.uber.org/atomic v1.9.0 // indirect
    go.uber.org/multierr v1.8.0 // indirect
    golang.org/x/arch v0.3.0 // indirect
go.sum
@@ -119,6 +119,8 @@
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE=
@@ -343,6 +345,8 @@
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.30.0 h1:SymVODrcRsaRaSInD9yQtKbtWqwsfoPcRff/oRXLj4c=
github.com/rs/zerolog v1.30.0/go.mod h1:/tk+P47gFdPXq4QYjvCmT5/Gsug2nagsFWBWhAiSi1w=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/songzhibin97/gkit v1.2.10 h1:J/LDN2hv7RL1q+o+krZLWGNsKGDtjSmC1BlTP8aiWMw=
@@ -395,6 +399,8 @@
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
@@ -539,6 +545,7 @@
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
main.go
@@ -43,6 +43,7 @@
        "task_status_sync",
        "device",
        "device_plc",
        "system_status",
    }
    agent := serf.InitAgent("apsClient", syncTables, sqlitex.GetDB())
model/index.go
@@ -29,6 +29,7 @@
        ProductionProgress{},
        Device{},
        TaskStatusSync{},
        SystemStatus{},
    )
}
@@ -43,6 +44,7 @@
    models := []interface{}{
        NewNetConfigSearch(nil),
        NewPlcBrandSearch(),
        NewSystemStatusSearch(),
    }
    for _, model := range models {
model/system_status.go
New file
@@ -0,0 +1,168 @@
package model
import (
    "apsClient/constvar"
    "apsClient/pkg/sqlitex"
    "errors"
    "fmt"
    "github.com/jinzhu/gorm"
    "sync"
)
type (
    // SystemStatus 系统状态
    SystemStatus struct {
        gorm.Model
        Key   constvar.SystemStatusKey   `json:"key" gorm:"column:key;type:varchar(255);not null"`
        Value constvar.SystemStatusValue `json:"value" gorm:"column:value;type:varchar(255);not null"`
    }
    // SystemStatusSearch 系统状态搜索条件
    SystemStatusSearch struct {
        SystemStatus
        Orm      *gorm.DB
        Keyword  string
        PageNum  int
        PageSize int
    }
)
func (SystemStatus) TableName() string {
    return "system_status"
}
func NewSystemStatusSearch() *SystemStatusSearch {
    return &SystemStatusSearch{
        Orm: sqlitex.GetDB(),
    }
}
func (slf *SystemStatusSearch) build() *gorm.DB {
    var db = slf.Orm.Model(&SystemStatus{})
    if slf.ID != 0 {
        db = db.Where("id = ?", slf.ID)
    }
    if slf.Key != "" {
        db = db.Where("key = ?", slf.Key)
    }
    return db
}
func (slf *SystemStatusSearch) Create(record *SystemStatus) error {
    var db = slf.build()
    return db.Create(record).Error
}
func (slf *SystemStatusSearch) CreateBatch(records []*SystemStatus) error {
    var db = slf.build()
    for _, record := range records {
        db.Create(record)
    }
    return nil
}
func (slf *SystemStatusSearch) Delete() error {
    var db = slf.build()
    return db.Delete(&SystemStatus{}).Error
}
func (slf *SystemStatusSearch) Update(record *SystemStatus) error {
    var db = slf.build()
    return db.Updates(record).Error
}
func (slf *SystemStatusSearch) FindAll() ([]*SystemStatus, error) {
    var db = slf.build()
    var record = make([]*SystemStatus, 0)
    err := db.Find(&record).Error
    return record, err
}
func (slf *SystemStatusSearch) SetId(id uint) *SystemStatusSearch {
    slf.ID = id
    return slf
}
func (slf *SystemStatusSearch) SetKey(key constvar.SystemStatusKey) *SystemStatusSearch {
    slf.Key = key
    return slf
}
func (slf *SystemStatusSearch) SetPage(page, size int) *SystemStatusSearch {
    slf.PageNum, slf.PageSize = page, size
    return slf
}
func (slf *SystemStatusSearch) SetOrm(tx *gorm.DB) *SystemStatusSearch {
    slf.Orm = tx
    return slf
}
func (slf *SystemStatusSearch) First() (*SystemStatus, error) {
    var db = slf.build()
    var record = new(SystemStatus)
    err := db.First(record).Error
    return record, err
}
func (slf *SystemStatusSearch) Updates(values interface{}) error {
    var db = slf.build()
    return db.Updates(values).Error
}
func (slf *SystemStatusSearch) Save(record *SystemStatus) error {
    if record.ID == 0 {
        return errors.New("id为空")
    }
    var db = slf.build()
    if err := db.Save(record).Error; err != nil {
        return fmt.Errorf("save err: %v, record: %+v", err, record)
    }
    return nil
}
func (slf *SystemStatusSearch) Find() ([]*SystemStatus, int64, error) {
    var db = slf.build()
    var records = make([]*SystemStatus, 0)
    var total int64
    if err := db.Count(&total).Error; err != nil {
        return records, total, err
    }
    if slf.PageNum > 0 && slf.PageSize > 0 {
        db = db.Limit(slf.PageSize).Offset((slf.PageNum - 1) * slf.PageSize)
    }
    err := db.Find(&records).Error
    return records, total, err
}
// InitDefaultData 初始化数据
func (slf *SystemStatusSearch) InitDefaultData(errCh chan<- error, wg *sync.WaitGroup) {
    var (
        db          = slf.Orm.Table(slf.TableName())
        total int64 = 0
    )
    defer wg.Done()
    if err := db.Count(&total).Error; err != nil {
        errCh <- err
        return
    }
    if total != 0 {
        return
    }
    records := []*SystemStatus{
        {
            Key:   constvar.SystemStatusKeyNsq,
            Value: constvar.SystemStatusValueNormal,
        },
    }
    err := slf.CreateBatch(records)
    if err != nil {
        errCh <- err
        return
    }
}
nsq/nsq.go
@@ -9,6 +9,7 @@
    "context"
    "errors"
    "fmt"
    "github.com/shirou/gopsutil/net"
    "sync"
    "sync/atomic"
)
@@ -85,10 +86,31 @@
    })
}
func (c *consumerManager) ping() bool {
    connections, err := net.Connections("inet")
    if err != nil {
        fmt.Println("Error:", err)
        return false
    }
    for _, conn := range connections {
        fmt.Println("net.Connections:", conn)
        ipPort := fmt.Sprintf("%s:%s", conn.Laddr.IP, conn.Laddr.Port)
        fmt.Println("net.Connections ipPort", ipPort)
        if ipPort == conf.Conf.NsqConf.NsqdAddr && conn.Status == "ESTABLISHED" {
            return true
        }
    }
    return false
}
func Init() error {
    return defaultConsumerManager.init()
}
func Ping() bool {
    return defaultConsumerManager.ping()
}
func Stop() {
    defaultConsumerManager.stop()
    StopProducer()
service/problem/check.go
@@ -3,9 +3,12 @@
import (
    "apsClient/conf"
    "apsClient/constvar"
    "apsClient/model"
    "apsClient/pkg/sqlitex"
    "apsClient/service"
    "apsClient/service/plc_address"
    "fmt"
    "net"
    "sync"
)
@@ -103,7 +106,19 @@
type Network struct{}
func (slf *Network) Check() bool {
    return false
    ifaces, err := net.Interfaces()
    if err != nil {
        fmt.Println("Error:", err)
        return false
    }
    var netAllFailed bool
    for _, iFace := range ifaces {
        if iFace.Flags == net.FlagUp {
            netAllFailed = false
            fmt.Printf("Interface %s is DOWN, indicating a network issue.\n", iFace.Name)
        }
    }
    return !netAllFailed
}
type DB struct{}
@@ -125,7 +140,11 @@
type Nsq struct{}
func (slf *Nsq) Check() bool {
    return false
    old, err := model.NewSystemStatusSearch().SetKey(constvar.SystemStatusKeyNsq).First()
    if err != nil {
        return false
    }
    return old.Value == constvar.SystemStatusValueNormal
}
type Device struct{}