From bf2b61519fd0d79ddb19f0469749fbbe1d6c4ad8 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期四, 19 十月 2023 17:23:50 +0800
Subject: [PATCH] 为保证正确获取集群状态, 添加程序启动时读取system-service运行状态

---
 serf/sync.go |   73 ++++++++++++++++++++++++++++++++++--
 main.go      |    7 +++
 2 files changed, 74 insertions(+), 6 deletions(-)

diff --git a/main.go b/main.go
index 74d4c67..8f852e5 100644
--- a/main.go
+++ b/main.go
@@ -30,6 +30,7 @@
 
 	// 鍚姩鏁版嵁鍚屾
 	var serfStartChan = make(chan bool)
+
 	// 闇�瑕佸悓姝ョ殑琛�
 	var syncTables = []string{
 		"procedures",
@@ -41,8 +42,12 @@
 
 	agent := serf.InitAgent("apsClient", syncTables, sqlitex.GetDB())
 	agent.RegisterClusterEvent(serfClusterEvent)
+
 	go agent.Serve(serfStartChan)
-	<-serfStartChan
+	if !<-serfStartChan {
+		logx.Errorf("serf Init err, exit")
+		return
+	}
 
 	// 鍒ゆ柇褰撳墠闆嗙兢鐘舵��
 	if agent.ClusterStatus != "slave" {
diff --git a/serf/sync.go b/serf/sync.go
index 7675e84..ea21710 100644
--- a/serf/sync.go
+++ b/serf/sync.go
@@ -1,6 +1,7 @@
 package serf
 
 import (
+	"apsClient/pkg/logx"
 	"context"
 	"encoding/json"
 	"fmt"
@@ -20,6 +21,9 @@
 
 var (
 	agent = SyncServer{}
+	dependProcs = []string{
+		bhomeclient.Proc_System_Service,
+	}
 )
 
 const (
@@ -101,6 +105,43 @@
 	bhomedbapi.InitDoReq(client.RequestOnly)
 	//bhomedbapi.InitLog(logger.Debug)
 
+	// 闇�瑕佺瓑寰卻ystem-service杩涚▼鎴愬姛鍚姩鍚庯紝鎵嶈兘鑾峰彇闆嗙兢鐘舵��(鎴栬�呬繚璇佺▼搴忓惎鍔ㄦ椂鑾峰彇鍒版纭殑鐘舵��)
+	tryTimes := 0
+loop:
+	for {
+		select {
+		case <-q:
+			initChan <- false
+			return
+		default:
+			if tryTimes < 15 {
+				clients, err := client.GetRegisteredClient()
+				if err == nil && len(clients) > 0 {
+					var existingProcs []string
+					for _, c := range clients {
+						if c.Online {
+							existingProcs = append(existingProcs, string(c.Proc.ProcId))
+						}
+					}
+					if diff := arrayContains(existingProcs, dependProcs); diff == "" {
+						initChan <- true
+						break loop
+					} else {
+						logx.Errorf("Proc: %s is not running!", diff)
+						time.Sleep(time.Second * 1)
+					}
+				} else {
+					tryTimes++
+					time.Sleep(time.Second * 5)
+				}
+			} else {
+				logx.Errorf("tried 15 times, client.GetRegisteredClient failed")
+				initChan <- false
+				return
+			}
+		}
+	}
+
 	go client.StartServer(nil)
 
 	ss.bhClient = client
@@ -157,7 +198,7 @@
 		return err
 	}
 
-	fmt.Println("鍔犲叆闆嗙兢, 璇锋眰鍚屾鍏ㄩ噺鏁版嵁,id:", ss.ServerId)
+	logx.Debugf("鍔犲叆闆嗙兢, 璇锋眰鍚屾鍏ㄩ噺鏁版嵁,id:%s", ss.ServerId)
 	return ss.bhClient.Publish(serfSyncTopic, b)
 }
 
@@ -177,7 +218,7 @@
 			// 澶勭悊鍚屾鍏ㄩ噺鏁版嵁鐨勮姹�
 			if string(busMsg.Topic) == ss.queryTableTopic {
 				if ss.ClusterStatus == "master" {
-					fmt.Println("鎺ユ敹鍒板悓姝ュ叏閲忔暟鎹姹�")
+					logx.Debugf("鎺ユ敹鍒板悓姝ュ叏閲忔暟鎹姹�.")
 					ss.handleSyncTableMessage(busMsg.Data)
 				}
 			}
@@ -248,7 +289,7 @@
 
 		err = tx.Exec(delSql).Error
 		if err != nil {
-			fmt.Println("鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁澶辫触,", err.Error())
+			logx.Errorf("鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁澶辫触, %s", err.Error())
 		}
 	}
 
@@ -285,7 +326,7 @@
 
 	ss.ClusterStatus = reply.Msg
 
-	fmt.Println("褰撳墠闆嗙兢鐘舵��:", ss.ClusterStatus)
+	logx.Debugf("褰撳墠闆嗙兢鐘舵��: %s", ss.ClusterStatus)
 
 	return reply.Msg
 }
@@ -364,7 +405,7 @@
 
 func (ss *SyncServer) handleSyncTableMessage(msg []byte) error {
 	targetId := string(msg)
-	fmt.Println("鍚屾鍏ㄩ噺鏁版嵁缁欒妭鐐�:", targetId)
+	//fmt.Println("鍚屾鍏ㄩ噺鏁版嵁缁欒妭鐐�:", targetId)
 	sqls, err := DumpTables(ss.sqlDB, ss.syncTables)
 	if err != nil {
 		fmt.Println("DumpTables error, ", err.Error())
@@ -376,3 +417,25 @@
 
 	return err
 }
+
+func arrayContains(list []string, arr []string) string {
+	if arr == nil || list == nil {
+		return ""
+	}
+
+	for _, s := range arr {
+		isExist := false
+		for _, t := range list {
+			if s == t {
+				isExist = true
+				break
+			}
+		}
+
+		if !isExist {
+			return s
+		}
+	}
+
+	return ""
+}

--
Gitblit v1.8.0