From f73f610cdf4b0666dc139a51e72353b88f8f25ab Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期四, 19 十月 2023 17:27:17 +0800
Subject: [PATCH] 为保证正确获取集群状态, 添加程序启动时读取system-service运行状态
---
serf/sync.go | 73 ++++++++++++++++++++++++++++++++++--
main.go | 7 +++
2 files changed, 74 insertions(+), 6 deletions(-)
diff --git a/main.go b/main.go
index 74d4c67..8f852e5 100644
--- a/main.go
+++ b/main.go
@@ -30,6 +30,7 @@
// 鍚姩鏁版嵁鍚屾
var serfStartChan = make(chan bool)
+
// 闇�瑕佸悓姝ョ殑琛�
var syncTables = []string{
"procedures",
@@ -41,8 +42,12 @@
agent := serf.InitAgent("apsClient", syncTables, sqlitex.GetDB())
agent.RegisterClusterEvent(serfClusterEvent)
+
go agent.Serve(serfStartChan)
- <-serfStartChan
+ if !<-serfStartChan {
+ logx.Errorf("serf Init err, exit")
+ return
+ }
// 鍒ゆ柇褰撳墠闆嗙兢鐘舵��
if agent.ClusterStatus != "slave" {
diff --git a/serf/sync.go b/serf/sync.go
index 7675e84..ea21710 100644
--- a/serf/sync.go
+++ b/serf/sync.go
@@ -1,6 +1,7 @@
package serf
import (
+ "apsClient/pkg/logx"
"context"
"encoding/json"
"fmt"
@@ -20,6 +21,9 @@
var (
agent = SyncServer{}
+ dependProcs = []string{
+ bhomeclient.Proc_System_Service,
+ }
)
const (
@@ -101,6 +105,43 @@
bhomedbapi.InitDoReq(client.RequestOnly)
//bhomedbapi.InitLog(logger.Debug)
+ // 闇�瑕佺瓑寰卻ystem-service杩涚▼鎴愬姛鍚姩鍚庯紝鎵嶈兘鑾峰彇闆嗙兢鐘舵��(鎴栬�呬繚璇佺▼搴忓惎鍔ㄦ椂鑾峰彇鍒版纭殑鐘舵��)
+ tryTimes := 0
+loop:
+ for {
+ select {
+ case <-q:
+ initChan <- false
+ return
+ default:
+ if tryTimes < 15 {
+ clients, err := client.GetRegisteredClient()
+ if err == nil && len(clients) > 0 {
+ var existingProcs []string
+ for _, c := range clients {
+ if c.Online {
+ existingProcs = append(existingProcs, string(c.Proc.ProcId))
+ }
+ }
+ if diff := arrayContains(existingProcs, dependProcs); diff == "" {
+ initChan <- true
+ break loop
+ } else {
+ logx.Errorf("Proc: %s is not running!", diff)
+ time.Sleep(time.Second * 1)
+ }
+ } else {
+ tryTimes++
+ time.Sleep(time.Second * 5)
+ }
+ } else {
+ logx.Errorf("tried 15 times, client.GetRegisteredClient failed")
+ initChan <- false
+ return
+ }
+ }
+ }
+
go client.StartServer(nil)
ss.bhClient = client
@@ -157,7 +198,7 @@
return err
}
- fmt.Println("鍔犲叆闆嗙兢, 璇锋眰鍚屾鍏ㄩ噺鏁版嵁,id:", ss.ServerId)
+ logx.Debugf("鍔犲叆闆嗙兢, 璇锋眰鍚屾鍏ㄩ噺鏁版嵁,id:%s", ss.ServerId)
return ss.bhClient.Publish(serfSyncTopic, b)
}
@@ -177,7 +218,7 @@
// 澶勭悊鍚屾鍏ㄩ噺鏁版嵁鐨勮姹�
if string(busMsg.Topic) == ss.queryTableTopic {
if ss.ClusterStatus == "master" {
- fmt.Println("鎺ユ敹鍒板悓姝ュ叏閲忔暟鎹姹�")
+ logx.Debugf("鎺ユ敹鍒板悓姝ュ叏閲忔暟鎹姹�.")
ss.handleSyncTableMessage(busMsg.Data)
}
}
@@ -248,7 +289,7 @@
err = tx.Exec(delSql).Error
if err != nil {
- fmt.Println("鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁澶辫触,", err.Error())
+ logx.Errorf("鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁澶辫触, %s", err.Error())
}
}
@@ -285,7 +326,7 @@
ss.ClusterStatus = reply.Msg
- fmt.Println("褰撳墠闆嗙兢鐘舵��:", ss.ClusterStatus)
+ logx.Debugf("褰撳墠闆嗙兢鐘舵��: %s", ss.ClusterStatus)
return reply.Msg
}
@@ -364,7 +405,7 @@
func (ss *SyncServer) handleSyncTableMessage(msg []byte) error {
targetId := string(msg)
- fmt.Println("鍚屾鍏ㄩ噺鏁版嵁缁欒妭鐐�:", targetId)
+ //fmt.Println("鍚屾鍏ㄩ噺鏁版嵁缁欒妭鐐�:", targetId)
sqls, err := DumpTables(ss.sqlDB, ss.syncTables)
if err != nil {
fmt.Println("DumpTables error, ", err.Error())
@@ -376,3 +417,25 @@
return err
}
+
+func arrayContains(list []string, arr []string) string {
+ if arr == nil || list == nil {
+ return ""
+ }
+
+ for _, s := range arr {
+ isExist := false
+ for _, t := range list {
+ if s == t {
+ isExist = true
+ break
+ }
+ }
+
+ if !isExist {
+ return s
+ }
+ }
+
+ return ""
+}
--
Gitblit v1.8.0