From 7c1dd2e19119edd1ab147a699dfdcd11447fca14 Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期四, 02 十一月 2023 21:20:53 +0800
Subject: [PATCH] serf集群master定时查询nsq连接状态并保存在数据库,增加网络检查器

---
 constvar/const.go        |   13 ++
 go.sum                   |    7 +
 service/problem/check.go |   23 ++++
 model/system_status.go   |  168 +++++++++++++++++++++++++++++++++
 model/index.go           |    2 
 go.mod                   |    3 
 main.go                  |    1 
 crontask/cron_task.go    |   27 +++++
 nsq/nsq.go               |   22 ++++
 9 files changed, 263 insertions(+), 3 deletions(-)

diff --git a/constvar/const.go b/constvar/const.go
index 3f6fc2b..a7e2065 100644
--- a/constvar/const.go
+++ b/constvar/const.go
@@ -113,3 +113,16 @@
 	ProblemCodePlcProcessModelAddressList ProblemCode = "plc_process_model_address_list" //plc鍦板潃琛ㄧ己澶�
 	ProblemCodePlcConnect                 ProblemCode = "plc_connect"                    //plc杩炴帴澶辫触
 )
+
+type SystemStatusKey string
+
+const (
+	SystemStatusKeyNsq SystemStatusKey = "nsq"
+)
+
+type SystemStatusValue string
+
+const (
+	SystemStatusValueNormal   SystemStatusValue = "1"
+	SystemStatusValueUnNormal SystemStatusValue = "2"
+)
diff --git a/crontask/cron_task.go b/crontask/cron_task.go
index 42dc94b..de593c3 100644
--- a/crontask/cron_task.go
+++ b/crontask/cron_task.go
@@ -3,6 +3,7 @@
 import (
 	"apsClient/conf"
 	"apsClient/constvar"
+	"apsClient/model"
 	"apsClient/model/common"
 	"apsClient/nsq"
 	"apsClient/pkg/ecode"
@@ -80,9 +81,11 @@
 	if isMaster {
 		s.Every(60).Seconds().Do(SyncProductionProgress) //鍚屾鐢熶骇鏁版嵁
 		s.Every(30).Seconds().Do(SyncTaskStatus)         //鍚屾浠诲姟鐘舵��
+		s.Every(10).Seconds().Do(CheckNsqConn)           //鏌ヨnsq杩炴帴
+
 	}
 
-	s.Every(10).Seconds().Do(QueryClusterStatus) //鏌ヨ闆嗙兢鑺傜偣鏁伴噺
+	s.Every(20).Seconds().Do(QueryClusterStatus) //鏌ヨ闆嗙兢鑺傜偣鏁伴噺
 	s.Every(30).Seconds().Do(ProblemCheck)       //闂璇婃柇
 
 	s.StartAsync()
@@ -168,3 +171,25 @@
 func ProblemCheck() {
 	problem.Check()
 }
+
+func CheckNsqConn() {
+	var err error
+	var status constvar.SystemStatusValue
+	if nsq.Ping() {
+		status = constvar.SystemStatusValueNormal
+	} else {
+		status = constvar.SystemStatusValueUnNormal
+	}
+	old, err := model.NewSystemStatusSearch().SetKey(constvar.SystemStatusKeyNsq).First()
+	if err != nil {
+		logx.Errorf("get nsq status err:%v", err)
+		return
+	}
+	if old.Value == status {
+		return
+	}
+	err = model.NewSystemStatusSearch().SetKey(constvar.SystemStatusKeyNsq).Updates(map[string]interface{}{"value": status})
+	if err != nil {
+		logx.Errorf("update nsq status err:%v", err)
+	}
+}
diff --git a/go.mod b/go.mod
index 24c5e68..ed9e5b3 100644
--- a/go.mod
+++ b/go.mod
@@ -21,6 +21,7 @@
 	github.com/mojocn/base64Captcha v1.3.1
 	github.com/nsqio/go-nsq v1.1.0
 	github.com/robfig/cron/v3 v3.0.1
+	github.com/shirou/gopsutil v3.21.11+incompatible
 	github.com/shopspring/decimal v1.3.1
 	github.com/songzhibin97/gkit v1.2.10
 	github.com/spf13/cast v1.5.1
@@ -49,6 +50,7 @@
 	github.com/fsnotify/fsnotify v1.6.0 // indirect
 	github.com/gabriel-vasile/mimetype v1.4.2 // indirect
 	github.com/gin-contrib/sse v0.1.0 // indirect
+	github.com/go-ole/go-ole v1.2.6 // indirect
 	github.com/go-openapi/jsonpointer v0.19.6 // indirect
 	github.com/go-openapi/jsonreference v0.20.1 // indirect
 	github.com/go-openapi/spec v0.20.4 // indirect
@@ -92,6 +94,7 @@
 	github.com/subosito/gotenv v1.4.2 // indirect
 	github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
 	github.com/ugorji/go/codec v1.2.11 // indirect
+	github.com/yusufpapurcu/wmi v1.2.2 // indirect
 	go.uber.org/atomic v1.9.0 // indirect
 	go.uber.org/multierr v1.8.0 // indirect
 	golang.org/x/arch v0.3.0 // indirect
diff --git a/go.sum b/go.sum
index 3832c1e..809ba0e 100644
--- a/go.sum
+++ b/go.sum
@@ -119,6 +119,8 @@
 github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
 github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
 github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
+github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
+github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
 github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
 github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
 github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE=
@@ -343,6 +345,8 @@
 github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
 github.com/rs/zerolog v1.30.0 h1:SymVODrcRsaRaSInD9yQtKbtWqwsfoPcRff/oRXLj4c=
 github.com/rs/zerolog v1.30.0/go.mod h1:/tk+P47gFdPXq4QYjvCmT5/Gsug2nagsFWBWhAiSi1w=
+github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
+github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
 github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
 github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
 github.com/songzhibin97/gkit v1.2.10 h1:J/LDN2hv7RL1q+o+krZLWGNsKGDtjSmC1BlTP8aiWMw=
@@ -395,6 +399,8 @@
 github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
+github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
+github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
 go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
 go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
 go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
@@ -539,6 +545,7 @@
 golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
diff --git a/main.go b/main.go
index 3bc397c..a12f0f4 100644
--- a/main.go
+++ b/main.go
@@ -43,6 +43,7 @@
 		"task_status_sync",
 		"device",
 		"device_plc",
+		"system_status",
 	}
 
 	agent := serf.InitAgent("apsClient", syncTables, sqlitex.GetDB())
diff --git a/model/index.go b/model/index.go
index 1b12699..aa38c06 100644
--- a/model/index.go
+++ b/model/index.go
@@ -29,6 +29,7 @@
 		ProductionProgress{},
 		Device{},
 		TaskStatusSync{},
+		SystemStatus{},
 	)
 }
 
@@ -43,6 +44,7 @@
 	models := []interface{}{
 		NewNetConfigSearch(nil),
 		NewPlcBrandSearch(),
+		NewSystemStatusSearch(),
 	}
 
 	for _, model := range models {
diff --git a/model/system_status.go b/model/system_status.go
new file mode 100644
index 0000000..90a05ee
--- /dev/null
+++ b/model/system_status.go
@@ -0,0 +1,168 @@
+package model
+
+import (
+	"apsClient/constvar"
+	"apsClient/pkg/sqlitex"
+	"errors"
+	"fmt"
+	"github.com/jinzhu/gorm"
+	"sync"
+)
+
+type (
+	// SystemStatus 绯荤粺鐘舵��
+	SystemStatus struct {
+		gorm.Model
+		Key   constvar.SystemStatusKey   `json:"key" gorm:"column:key;type:varchar(255);not null"`
+		Value constvar.SystemStatusValue `json:"value" gorm:"column:value;type:varchar(255);not null"`
+	}
+
+	// SystemStatusSearch 绯荤粺鐘舵�佹悳绱㈡潯浠�
+	SystemStatusSearch struct {
+		SystemStatus
+		Orm      *gorm.DB
+		Keyword  string
+		PageNum  int
+		PageSize int
+	}
+)
+
+func (SystemStatus) TableName() string {
+	return "system_status"
+}
+
+func NewSystemStatusSearch() *SystemStatusSearch {
+	return &SystemStatusSearch{
+		Orm: sqlitex.GetDB(),
+	}
+}
+
+func (slf *SystemStatusSearch) build() *gorm.DB {
+	var db = slf.Orm.Model(&SystemStatus{})
+	if slf.ID != 0 {
+		db = db.Where("id = ?", slf.ID)
+	}
+	if slf.Key != "" {
+		db = db.Where("key = ?", slf.Key)
+	}
+
+	return db
+}
+
+func (slf *SystemStatusSearch) Create(record *SystemStatus) error {
+	var db = slf.build()
+	return db.Create(record).Error
+}
+
+func (slf *SystemStatusSearch) CreateBatch(records []*SystemStatus) error {
+	var db = slf.build()
+	for _, record := range records {
+		db.Create(record)
+	}
+	return nil
+}
+
+func (slf *SystemStatusSearch) Delete() error {
+	var db = slf.build()
+	return db.Delete(&SystemStatus{}).Error
+}
+
+func (slf *SystemStatusSearch) Update(record *SystemStatus) error {
+	var db = slf.build()
+	return db.Updates(record).Error
+}
+
+func (slf *SystemStatusSearch) FindAll() ([]*SystemStatus, error) {
+	var db = slf.build()
+	var record = make([]*SystemStatus, 0)
+	err := db.Find(&record).Error
+	return record, err
+}
+
+func (slf *SystemStatusSearch) SetId(id uint) *SystemStatusSearch {
+	slf.ID = id
+	return slf
+}
+
+func (slf *SystemStatusSearch) SetKey(key constvar.SystemStatusKey) *SystemStatusSearch {
+	slf.Key = key
+	return slf
+}
+
+func (slf *SystemStatusSearch) SetPage(page, size int) *SystemStatusSearch {
+	slf.PageNum, slf.PageSize = page, size
+	return slf
+}
+
+func (slf *SystemStatusSearch) SetOrm(tx *gorm.DB) *SystemStatusSearch {
+	slf.Orm = tx
+	return slf
+}
+
+func (slf *SystemStatusSearch) First() (*SystemStatus, error) {
+	var db = slf.build()
+	var record = new(SystemStatus)
+	err := db.First(record).Error
+	return record, err
+}
+
+func (slf *SystemStatusSearch) Updates(values interface{}) error {
+	var db = slf.build()
+	return db.Updates(values).Error
+}
+
+func (slf *SystemStatusSearch) Save(record *SystemStatus) error {
+	if record.ID == 0 {
+		return errors.New("id涓虹┖")
+	}
+	var db = slf.build()
+
+	if err := db.Save(record).Error; err != nil {
+		return fmt.Errorf("save err: %v, record: %+v", err, record)
+	}
+
+	return nil
+}
+
+func (slf *SystemStatusSearch) Find() ([]*SystemStatus, int64, error) {
+	var db = slf.build()
+	var records = make([]*SystemStatus, 0)
+	var total int64
+	if err := db.Count(&total).Error; err != nil {
+		return records, total, err
+	}
+	if slf.PageNum > 0 && slf.PageSize > 0 {
+		db = db.Limit(slf.PageSize).Offset((slf.PageNum - 1) * slf.PageSize)
+	}
+
+	err := db.Find(&records).Error
+	return records, total, err
+}
+
+// InitDefaultData 鍒濆鍖栨暟鎹�
+func (slf *SystemStatusSearch) InitDefaultData(errCh chan<- error, wg *sync.WaitGroup) {
+	var (
+		db          = slf.Orm.Table(slf.TableName())
+		total int64 = 0
+	)
+	defer wg.Done()
+
+	if err := db.Count(&total).Error; err != nil {
+		errCh <- err
+		return
+	}
+	if total != 0 {
+		return
+	}
+	records := []*SystemStatus{
+		{
+			Key:   constvar.SystemStatusKeyNsq,
+			Value: constvar.SystemStatusValueNormal,
+		},
+	}
+	err := slf.CreateBatch(records)
+	if err != nil {
+		errCh <- err
+		return
+	}
+}
diff --git a/nsq/nsq.go b/nsq/nsq.go
index 68274b2..8034d05 100644
--- a/nsq/nsq.go
+++ b/nsq/nsq.go
@@ -9,6 +9,7 @@
 	"context"
 	"errors"
 	"fmt"
+	"github.com/shirou/gopsutil/net"
 	"sync"
 	"sync/atomic"
 )
@@ -85,10 +86,31 @@
 	})
 }
 
+func (c *consumerManager) ping() bool {
+	connections, err := net.Connections("inet")
+	if err != nil {
+		fmt.Println("Error:", err)
+		return false
+	}
+	for _, conn := range connections {
+		fmt.Println("net.Connections:", conn)
+		ipPort := fmt.Sprintf("%s:%s", conn.Laddr.IP, conn.Laddr.Port)
+		fmt.Println("net.Connections ipPort", ipPort)
+		if ipPort == conf.Conf.NsqConf.NsqdAddr && conn.Status == "ESTABLISHED" {
+			return true
+		}
+	}
+	return false
+}
+
 func Init() error {
 	return defaultConsumerManager.init()
 }
 
+func Ping() bool {
+	return defaultConsumerManager.ping()
+}
+
 func Stop() {
 	defaultConsumerManager.stop()
 	StopProducer()
diff --git a/service/problem/check.go b/service/problem/check.go
index d472858..b379b7b 100644
--- a/service/problem/check.go
+++ b/service/problem/check.go
@@ -3,9 +3,12 @@
 import (
 	"apsClient/conf"
 	"apsClient/constvar"
+	"apsClient/model"
 	"apsClient/pkg/sqlitex"
 	"apsClient/service"
 	"apsClient/service/plc_address"
+	"fmt"
+	"net"
 	"sync"
 )
 
@@ -103,7 +106,19 @@
 type Network struct{}
 
 func (slf *Network) Check() bool {
-	return false
+	ifaces, err := net.Interfaces()
+	if err != nil {
+		fmt.Println("Error:", err)
+		return false
+	}
+	var netAllFailed bool
+	for _, iFace := range ifaces {
+		if iFace.Flags == net.FlagUp {
+			netAllFailed = false
+			fmt.Printf("Interface %s is DOWN, indicating a network issue.\n", iFace.Name)
+		}
+	}
+	return !netAllFailed
 }
 
 type DB struct{}
@@ -125,7 +140,11 @@
 type Nsq struct{}
 
 func (slf *Nsq) Check() bool {
-	return false
+	old, err := model.NewSystemStatusSearch().SetKey(constvar.SystemStatusKeyNsq).First()
+	if err != nil {
+		return false
+	}
+	return old.Value == constvar.SystemStatusValueNormal
 }
 
 type Device struct{}

--
Gitblit v1.8.0