From 7c1dd2e19119edd1ab147a699dfdcd11447fca14 Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期四, 02 十一月 2023 21:20:53 +0800 Subject: [PATCH] serf集群master定时查询nsq连接状态并保存在数据库,增加网络检查器 --- constvar/const.go | 13 ++ go.sum | 7 + service/problem/check.go | 23 ++++ model/system_status.go | 168 +++++++++++++++++++++++++++++++++ model/index.go | 2 go.mod | 3 main.go | 1 crontask/cron_task.go | 27 +++++ nsq/nsq.go | 22 ++++ 9 files changed, 263 insertions(+), 3 deletions(-) diff --git a/constvar/const.go b/constvar/const.go index 3f6fc2b..a7e2065 100644 --- a/constvar/const.go +++ b/constvar/const.go @@ -113,3 +113,16 @@ ProblemCodePlcProcessModelAddressList ProblemCode = "plc_process_model_address_list" //plc鍦板潃琛ㄧ己澶� ProblemCodePlcConnect ProblemCode = "plc_connect" //plc杩炴帴澶辫触 ) + +type SystemStatusKey string + +const ( + SystemStatusKeyNsq SystemStatusKey = "nsq" +) + +type SystemStatusValue string + +const ( + SystemStatusValueNormal SystemStatusValue = "1" + SystemStatusValueUnNormal SystemStatusValue = "2" +) diff --git a/crontask/cron_task.go b/crontask/cron_task.go index 42dc94b..de593c3 100644 --- a/crontask/cron_task.go +++ b/crontask/cron_task.go @@ -3,6 +3,7 @@ import ( "apsClient/conf" "apsClient/constvar" + "apsClient/model" "apsClient/model/common" "apsClient/nsq" "apsClient/pkg/ecode" @@ -80,9 +81,11 @@ if isMaster { s.Every(60).Seconds().Do(SyncProductionProgress) //鍚屾鐢熶骇鏁版嵁 s.Every(30).Seconds().Do(SyncTaskStatus) //鍚屾浠诲姟鐘舵�� + s.Every(10).Seconds().Do(CheckNsqConn) //鏌ヨnsq杩炴帴 + } - s.Every(10).Seconds().Do(QueryClusterStatus) //鏌ヨ闆嗙兢鑺傜偣鏁伴噺 + s.Every(20).Seconds().Do(QueryClusterStatus) //鏌ヨ闆嗙兢鑺傜偣鏁伴噺 s.Every(30).Seconds().Do(ProblemCheck) //闂璇婃柇 s.StartAsync() @@ -168,3 +171,25 @@ func ProblemCheck() { problem.Check() } + +func CheckNsqConn() { + var err error + var status constvar.SystemStatusValue + if nsq.Ping() { + status = constvar.SystemStatusValueNormal + } else { + status = constvar.SystemStatusValueUnNormal + } + old, err := model.NewSystemStatusSearch().SetKey(constvar.SystemStatusKeyNsq).First() + if err != nil { + logx.Errorf("get nsq status err:%v", err) + return + } + if old.Value == status { + return + } + err = model.NewSystemStatusSearch().SetKey(constvar.SystemStatusKeyNsq).Updates(map[string]interface{}{"value": status}) + if err != nil { + logx.Errorf("update nsq status err:%v", err) + } +} diff --git a/go.mod b/go.mod index 24c5e68..ed9e5b3 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ github.com/mojocn/base64Captcha v1.3.1 github.com/nsqio/go-nsq v1.1.0 github.com/robfig/cron/v3 v3.0.1 + github.com/shirou/gopsutil v3.21.11+incompatible github.com/shopspring/decimal v1.3.1 github.com/songzhibin97/gkit v1.2.10 github.com/spf13/cast v1.5.1 @@ -49,6 +50,7 @@ github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect github.com/gin-contrib/sse v0.1.0 // indirect + github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.1 // indirect github.com/go-openapi/spec v0.20.4 // indirect @@ -92,6 +94,7 @@ github.com/subosito/gotenv v1.4.2 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.11 // indirect + github.com/yusufpapurcu/wmi v1.2.2 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.8.0 // indirect golang.org/x/arch v0.3.0 // indirect diff --git a/go.sum b/go.sum index 3832c1e..809ba0e 100644 --- a/go.sum +++ b/go.sum @@ -119,6 +119,8 @@ github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= +github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE= @@ -343,6 +345,8 @@ github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.30.0 h1:SymVODrcRsaRaSInD9yQtKbtWqwsfoPcRff/oRXLj4c= github.com/rs/zerolog v1.30.0/go.mod h1:/tk+P47gFdPXq4QYjvCmT5/Gsug2nagsFWBWhAiSi1w= +github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= +github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/songzhibin97/gkit v1.2.10 h1:J/LDN2hv7RL1q+o+krZLWGNsKGDtjSmC1BlTP8aiWMw= @@ -395,6 +399,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg= +github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -539,6 +545,7 @@ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/main.go b/main.go index 3bc397c..a12f0f4 100644 --- a/main.go +++ b/main.go @@ -43,6 +43,7 @@ "task_status_sync", "device", "device_plc", + "system_status", } agent := serf.InitAgent("apsClient", syncTables, sqlitex.GetDB()) diff --git a/model/index.go b/model/index.go index 1b12699..aa38c06 100644 --- a/model/index.go +++ b/model/index.go @@ -29,6 +29,7 @@ ProductionProgress{}, Device{}, TaskStatusSync{}, + SystemStatus{}, ) } @@ -43,6 +44,7 @@ models := []interface{}{ NewNetConfigSearch(nil), NewPlcBrandSearch(), + NewSystemStatusSearch(), } for _, model := range models { diff --git a/model/system_status.go b/model/system_status.go new file mode 100644 index 0000000..90a05ee --- /dev/null +++ b/model/system_status.go @@ -0,0 +1,168 @@ +package model + +import ( + "apsClient/constvar" + "apsClient/pkg/sqlitex" + "errors" + "fmt" + "github.com/jinzhu/gorm" + "sync" +) + +type ( + // SystemStatus 绯荤粺鐘舵�� + SystemStatus struct { + gorm.Model + Key constvar.SystemStatusKey `json:"key" gorm:"column:key;type:varchar(255);not null"` + Value constvar.SystemStatusValue `json:"value" gorm:"column:value;type:varchar(255);not null"` + } + + // SystemStatusSearch 绯荤粺鐘舵�佹悳绱㈡潯浠� + SystemStatusSearch struct { + SystemStatus + Orm *gorm.DB + Keyword string + PageNum int + PageSize int + } +) + +func (SystemStatus) TableName() string { + return "system_status" +} + +func NewSystemStatusSearch() *SystemStatusSearch { + return &SystemStatusSearch{ + Orm: sqlitex.GetDB(), + } +} + +func (slf *SystemStatusSearch) build() *gorm.DB { + var db = slf.Orm.Model(&SystemStatus{}) + if slf.ID != 0 { + db = db.Where("id = ?", slf.ID) + } + if slf.Key != "" { + db = db.Where("key = ?", slf.Key) + } + + return db +} + +func (slf *SystemStatusSearch) Create(record *SystemStatus) error { + var db = slf.build() + return db.Create(record).Error +} + +func (slf *SystemStatusSearch) CreateBatch(records []*SystemStatus) error { + var db = slf.build() + for _, record := range records { + db.Create(record) + } + return nil +} + +func (slf *SystemStatusSearch) Delete() error { + var db = slf.build() + return db.Delete(&SystemStatus{}).Error +} + +func (slf *SystemStatusSearch) Update(record *SystemStatus) error { + var db = slf.build() + return db.Updates(record).Error +} + +func (slf *SystemStatusSearch) FindAll() ([]*SystemStatus, error) { + var db = slf.build() + var record = make([]*SystemStatus, 0) + err := db.Find(&record).Error + return record, err +} + +func (slf *SystemStatusSearch) SetId(id uint) *SystemStatusSearch { + slf.ID = id + return slf +} + +func (slf *SystemStatusSearch) SetKey(key constvar.SystemStatusKey) *SystemStatusSearch { + slf.Key = key + return slf +} + +func (slf *SystemStatusSearch) SetPage(page, size int) *SystemStatusSearch { + slf.PageNum, slf.PageSize = page, size + return slf +} + +func (slf *SystemStatusSearch) SetOrm(tx *gorm.DB) *SystemStatusSearch { + slf.Orm = tx + return slf +} + +func (slf *SystemStatusSearch) First() (*SystemStatus, error) { + var db = slf.build() + var record = new(SystemStatus) + err := db.First(record).Error + return record, err +} + +func (slf *SystemStatusSearch) Updates(values interface{}) error { + var db = slf.build() + return db.Updates(values).Error +} + +func (slf *SystemStatusSearch) Save(record *SystemStatus) error { + if record.ID == 0 { + return errors.New("id涓虹┖") + } + var db = slf.build() + + if err := db.Save(record).Error; err != nil { + return fmt.Errorf("save err: %v, record: %+v", err, record) + } + + return nil +} + +func (slf *SystemStatusSearch) Find() ([]*SystemStatus, int64, error) { + var db = slf.build() + var records = make([]*SystemStatus, 0) + var total int64 + if err := db.Count(&total).Error; err != nil { + return records, total, err + } + if slf.PageNum > 0 && slf.PageSize > 0 { + db = db.Limit(slf.PageSize).Offset((slf.PageNum - 1) * slf.PageSize) + } + + err := db.Find(&records).Error + return records, total, err +} + +// InitDefaultData 鍒濆鍖栨暟鎹� +func (slf *SystemStatusSearch) InitDefaultData(errCh chan<- error, wg *sync.WaitGroup) { + var ( + db = slf.Orm.Table(slf.TableName()) + total int64 = 0 + ) + defer wg.Done() + + if err := db.Count(&total).Error; err != nil { + errCh <- err + return + } + if total != 0 { + return + } + records := []*SystemStatus{ + { + Key: constvar.SystemStatusKeyNsq, + Value: constvar.SystemStatusValueNormal, + }, + } + err := slf.CreateBatch(records) + if err != nil { + errCh <- err + return + } +} diff --git a/nsq/nsq.go b/nsq/nsq.go index 68274b2..8034d05 100644 --- a/nsq/nsq.go +++ b/nsq/nsq.go @@ -9,6 +9,7 @@ "context" "errors" "fmt" + "github.com/shirou/gopsutil/net" "sync" "sync/atomic" ) @@ -85,10 +86,31 @@ }) } +func (c *consumerManager) ping() bool { + connections, err := net.Connections("inet") + if err != nil { + fmt.Println("Error:", err) + return false + } + for _, conn := range connections { + fmt.Println("net.Connections:", conn) + ipPort := fmt.Sprintf("%s:%s", conn.Laddr.IP, conn.Laddr.Port) + fmt.Println("net.Connections ipPort", ipPort) + if ipPort == conf.Conf.NsqConf.NsqdAddr && conn.Status == "ESTABLISHED" { + return true + } + } + return false +} + func Init() error { return defaultConsumerManager.init() } +func Ping() bool { + return defaultConsumerManager.ping() +} + func Stop() { defaultConsumerManager.stop() StopProducer() diff --git a/service/problem/check.go b/service/problem/check.go index d472858..b379b7b 100644 --- a/service/problem/check.go +++ b/service/problem/check.go @@ -3,9 +3,12 @@ import ( "apsClient/conf" "apsClient/constvar" + "apsClient/model" "apsClient/pkg/sqlitex" "apsClient/service" "apsClient/service/plc_address" + "fmt" + "net" "sync" ) @@ -103,7 +106,19 @@ type Network struct{} func (slf *Network) Check() bool { - return false + ifaces, err := net.Interfaces() + if err != nil { + fmt.Println("Error:", err) + return false + } + var netAllFailed bool + for _, iFace := range ifaces { + if iFace.Flags == net.FlagUp { + netAllFailed = false + fmt.Printf("Interface %s is DOWN, indicating a network issue.\n", iFace.Name) + } + } + return !netAllFailed } type DB struct{} @@ -125,7 +140,11 @@ type Nsq struct{} func (slf *Nsq) Check() bool { - return false + old, err := model.NewSystemStatusSearch().SetKey(constvar.SystemStatusKeyNsq).First() + if err != nil { + return false + } + return old.Value == constvar.SystemStatusValueNormal } type Device struct{} -- Gitblit v1.8.0