From 7c1dd2e19119edd1ab147a699dfdcd11447fca14 Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期四, 02 十一月 2023 21:20:53 +0800
Subject: [PATCH] serf集群master定时查询nsq连接状态并保存在数据库,增加网络检查器
---
constvar/const.go | 13 ++
go.sum | 7 +
service/problem/check.go | 23 ++++
model/system_status.go | 168 +++++++++++++++++++++++++++++++++
model/index.go | 2
go.mod | 3
main.go | 1
crontask/cron_task.go | 27 +++++
nsq/nsq.go | 22 ++++
9 files changed, 263 insertions(+), 3 deletions(-)
diff --git a/constvar/const.go b/constvar/const.go
index 3f6fc2b..a7e2065 100644
--- a/constvar/const.go
+++ b/constvar/const.go
@@ -113,3 +113,16 @@
ProblemCodePlcProcessModelAddressList ProblemCode = "plc_process_model_address_list" //plc鍦板潃琛ㄧ己澶�
ProblemCodePlcConnect ProblemCode = "plc_connect" //plc杩炴帴澶辫触
)
+
+type SystemStatusKey string
+
+const (
+ SystemStatusKeyNsq SystemStatusKey = "nsq"
+)
+
+type SystemStatusValue string
+
+const (
+ SystemStatusValueNormal SystemStatusValue = "1"
+ SystemStatusValueUnNormal SystemStatusValue = "2"
+)
diff --git a/crontask/cron_task.go b/crontask/cron_task.go
index 42dc94b..de593c3 100644
--- a/crontask/cron_task.go
+++ b/crontask/cron_task.go
@@ -3,6 +3,7 @@
import (
"apsClient/conf"
"apsClient/constvar"
+ "apsClient/model"
"apsClient/model/common"
"apsClient/nsq"
"apsClient/pkg/ecode"
@@ -80,9 +81,11 @@
if isMaster {
s.Every(60).Seconds().Do(SyncProductionProgress) //鍚屾鐢熶骇鏁版嵁
s.Every(30).Seconds().Do(SyncTaskStatus) //鍚屾浠诲姟鐘舵��
+ s.Every(10).Seconds().Do(CheckNsqConn) //鏌ヨnsq杩炴帴
+
}
- s.Every(10).Seconds().Do(QueryClusterStatus) //鏌ヨ闆嗙兢鑺傜偣鏁伴噺
+ s.Every(20).Seconds().Do(QueryClusterStatus) //鏌ヨ闆嗙兢鑺傜偣鏁伴噺
s.Every(30).Seconds().Do(ProblemCheck) //闂璇婃柇
s.StartAsync()
@@ -168,3 +171,25 @@
func ProblemCheck() {
problem.Check()
}
+
+func CheckNsqConn() {
+ var err error
+ var status constvar.SystemStatusValue
+ if nsq.Ping() {
+ status = constvar.SystemStatusValueNormal
+ } else {
+ status = constvar.SystemStatusValueUnNormal
+ }
+ old, err := model.NewSystemStatusSearch().SetKey(constvar.SystemStatusKeyNsq).First()
+ if err != nil {
+ logx.Errorf("get nsq status err:%v", err)
+ return
+ }
+ if old.Value == status {
+ return
+ }
+ err = model.NewSystemStatusSearch().SetKey(constvar.SystemStatusKeyNsq).Updates(map[string]interface{}{"value": status})
+ if err != nil {
+ logx.Errorf("update nsq status err:%v", err)
+ }
+}
diff --git a/go.mod b/go.mod
index 24c5e68..ed9e5b3 100644
--- a/go.mod
+++ b/go.mod
@@ -21,6 +21,7 @@
github.com/mojocn/base64Captcha v1.3.1
github.com/nsqio/go-nsq v1.1.0
github.com/robfig/cron/v3 v3.0.1
+ github.com/shirou/gopsutil v3.21.11+incompatible
github.com/shopspring/decimal v1.3.1
github.com/songzhibin97/gkit v1.2.10
github.com/spf13/cast v1.5.1
@@ -49,6 +50,7 @@
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
+ github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.1 // indirect
github.com/go-openapi/spec v0.20.4 // indirect
@@ -92,6 +94,7 @@
github.com/subosito/gotenv v1.4.2 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
+ github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/arch v0.3.0 // indirect
diff --git a/go.sum b/go.sum
index 3832c1e..809ba0e 100644
--- a/go.sum
+++ b/go.sum
@@ -119,6 +119,8 @@
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
+github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
+github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE=
@@ -343,6 +345,8 @@
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.30.0 h1:SymVODrcRsaRaSInD9yQtKbtWqwsfoPcRff/oRXLj4c=
github.com/rs/zerolog v1.30.0/go.mod h1:/tk+P47gFdPXq4QYjvCmT5/Gsug2nagsFWBWhAiSi1w=
+github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
+github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/songzhibin97/gkit v1.2.10 h1:J/LDN2hv7RL1q+o+krZLWGNsKGDtjSmC1BlTP8aiWMw=
@@ -395,6 +399,8 @@
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
+github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
+github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
@@ -539,6 +545,7 @@
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
diff --git a/main.go b/main.go
index 3bc397c..a12f0f4 100644
--- a/main.go
+++ b/main.go
@@ -43,6 +43,7 @@
"task_status_sync",
"device",
"device_plc",
+ "system_status",
}
agent := serf.InitAgent("apsClient", syncTables, sqlitex.GetDB())
diff --git a/model/index.go b/model/index.go
index 1b12699..aa38c06 100644
--- a/model/index.go
+++ b/model/index.go
@@ -29,6 +29,7 @@
ProductionProgress{},
Device{},
TaskStatusSync{},
+ SystemStatus{},
)
}
@@ -43,6 +44,7 @@
models := []interface{}{
NewNetConfigSearch(nil),
NewPlcBrandSearch(),
+ NewSystemStatusSearch(),
}
for _, model := range models {
diff --git a/model/system_status.go b/model/system_status.go
new file mode 100644
index 0000000..90a05ee
--- /dev/null
+++ b/model/system_status.go
@@ -0,0 +1,168 @@
+package model
+
+import (
+ "apsClient/constvar"
+ "apsClient/pkg/sqlitex"
+ "errors"
+ "fmt"
+ "github.com/jinzhu/gorm"
+ "sync"
+)
+
+type (
+ // SystemStatus 绯荤粺鐘舵��
+ SystemStatus struct {
+ gorm.Model
+ Key constvar.SystemStatusKey `json:"key" gorm:"column:key;type:varchar(255);not null"`
+ Value constvar.SystemStatusValue `json:"value" gorm:"column:value;type:varchar(255);not null"`
+ }
+
+ // SystemStatusSearch 绯荤粺鐘舵�佹悳绱㈡潯浠�
+ SystemStatusSearch struct {
+ SystemStatus
+ Orm *gorm.DB
+ Keyword string
+ PageNum int
+ PageSize int
+ }
+)
+
+func (SystemStatus) TableName() string {
+ return "system_status"
+}
+
+func NewSystemStatusSearch() *SystemStatusSearch {
+ return &SystemStatusSearch{
+ Orm: sqlitex.GetDB(),
+ }
+}
+
+func (slf *SystemStatusSearch) build() *gorm.DB {
+ var db = slf.Orm.Model(&SystemStatus{})
+ if slf.ID != 0 {
+ db = db.Where("id = ?", slf.ID)
+ }
+ if slf.Key != "" {
+ db = db.Where("key = ?", slf.Key)
+ }
+
+ return db
+}
+
+func (slf *SystemStatusSearch) Create(record *SystemStatus) error {
+ var db = slf.build()
+ return db.Create(record).Error
+}
+
+func (slf *SystemStatusSearch) CreateBatch(records []*SystemStatus) error {
+ var db = slf.build()
+ for _, record := range records {
+ db.Create(record)
+ }
+ return nil
+}
+
+func (slf *SystemStatusSearch) Delete() error {
+ var db = slf.build()
+ return db.Delete(&SystemStatus{}).Error
+}
+
+func (slf *SystemStatusSearch) Update(record *SystemStatus) error {
+ var db = slf.build()
+ return db.Updates(record).Error
+}
+
+func (slf *SystemStatusSearch) FindAll() ([]*SystemStatus, error) {
+ var db = slf.build()
+ var record = make([]*SystemStatus, 0)
+ err := db.Find(&record).Error
+ return record, err
+}
+
+func (slf *SystemStatusSearch) SetId(id uint) *SystemStatusSearch {
+ slf.ID = id
+ return slf
+}
+
+func (slf *SystemStatusSearch) SetKey(key constvar.SystemStatusKey) *SystemStatusSearch {
+ slf.Key = key
+ return slf
+}
+
+func (slf *SystemStatusSearch) SetPage(page, size int) *SystemStatusSearch {
+ slf.PageNum, slf.PageSize = page, size
+ return slf
+}
+
+func (slf *SystemStatusSearch) SetOrm(tx *gorm.DB) *SystemStatusSearch {
+ slf.Orm = tx
+ return slf
+}
+
+func (slf *SystemStatusSearch) First() (*SystemStatus, error) {
+ var db = slf.build()
+ var record = new(SystemStatus)
+ err := db.First(record).Error
+ return record, err
+}
+
+func (slf *SystemStatusSearch) Updates(values interface{}) error {
+ var db = slf.build()
+ return db.Updates(values).Error
+}
+
+func (slf *SystemStatusSearch) Save(record *SystemStatus) error {
+ if record.ID == 0 {
+ return errors.New("id涓虹┖")
+ }
+ var db = slf.build()
+
+ if err := db.Save(record).Error; err != nil {
+ return fmt.Errorf("save err: %v, record: %+v", err, record)
+ }
+
+ return nil
+}
+
+func (slf *SystemStatusSearch) Find() ([]*SystemStatus, int64, error) {
+ var db = slf.build()
+ var records = make([]*SystemStatus, 0)
+ var total int64
+ if err := db.Count(&total).Error; err != nil {
+ return records, total, err
+ }
+ if slf.PageNum > 0 && slf.PageSize > 0 {
+ db = db.Limit(slf.PageSize).Offset((slf.PageNum - 1) * slf.PageSize)
+ }
+
+ err := db.Find(&records).Error
+ return records, total, err
+}
+
+// InitDefaultData 鍒濆鍖栨暟鎹�
+func (slf *SystemStatusSearch) InitDefaultData(errCh chan<- error, wg *sync.WaitGroup) {
+ var (
+ db = slf.Orm.Table(slf.TableName())
+ total int64 = 0
+ )
+ defer wg.Done()
+
+ if err := db.Count(&total).Error; err != nil {
+ errCh <- err
+ return
+ }
+ if total != 0 {
+ return
+ }
+ records := []*SystemStatus{
+ {
+ Key: constvar.SystemStatusKeyNsq,
+ Value: constvar.SystemStatusValueNormal,
+ },
+ }
+ err := slf.CreateBatch(records)
+ if err != nil {
+ errCh <- err
+ return
+ }
+}
diff --git a/nsq/nsq.go b/nsq/nsq.go
index 68274b2..8034d05 100644
--- a/nsq/nsq.go
+++ b/nsq/nsq.go
@@ -9,6 +9,7 @@
"context"
"errors"
"fmt"
+ "github.com/shirou/gopsutil/net"
"sync"
"sync/atomic"
)
@@ -85,10 +86,31 @@
})
}
+func (c *consumerManager) ping() bool {
+ connections, err := net.Connections("inet")
+ if err != nil {
+ fmt.Println("Error:", err)
+ return false
+ }
+ for _, conn := range connections {
+ fmt.Println("net.Connections:", conn)
+ ipPort := fmt.Sprintf("%s:%s", conn.Laddr.IP, conn.Laddr.Port)
+ fmt.Println("net.Connections ipPort", ipPort)
+ if ipPort == conf.Conf.NsqConf.NsqdAddr && conn.Status == "ESTABLISHED" {
+ return true
+ }
+ }
+ return false
+}
+
func Init() error {
return defaultConsumerManager.init()
}
+func Ping() bool {
+ return defaultConsumerManager.ping()
+}
+
func Stop() {
defaultConsumerManager.stop()
StopProducer()
diff --git a/service/problem/check.go b/service/problem/check.go
index d472858..b379b7b 100644
--- a/service/problem/check.go
+++ b/service/problem/check.go
@@ -3,9 +3,12 @@
import (
"apsClient/conf"
"apsClient/constvar"
+ "apsClient/model"
"apsClient/pkg/sqlitex"
"apsClient/service"
"apsClient/service/plc_address"
+ "fmt"
+ "net"
"sync"
)
@@ -103,7 +106,19 @@
type Network struct{}
func (slf *Network) Check() bool {
- return false
+ ifaces, err := net.Interfaces()
+ if err != nil {
+ fmt.Println("Error:", err)
+ return false
+ }
+ var netAllFailed bool
+ for _, iFace := range ifaces {
+ if iFace.Flags == net.FlagUp {
+ netAllFailed = false
+ fmt.Printf("Interface %s is DOWN, indicating a network issue.\n", iFace.Name)
+ }
+ }
+ return !netAllFailed
}
type DB struct{}
@@ -125,7 +140,11 @@
type Nsq struct{}
func (slf *Nsq) Check() bool {
- return false
+ old, err := model.NewSystemStatusSearch().SetKey(constvar.SystemStatusKeyNsq).First()
+ if err != nil {
+ return false
+ }
+ return old.Value == constvar.SystemStatusValueNormal
}
type Device struct{}
--
Gitblit v1.8.0