zhangzengfei
2023-10-19 bf2b61519fd0d79ddb19f0469749fbbe1d6c4ad8
为保证正确获取集群状态, 添加程序启动时读取system-service运行状态
2个文件已修改
80 ■■■■■ 已修改文件
main.go 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
serf/sync.go 73 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go
@@ -30,6 +30,7 @@
    // 启动数据同步
    var serfStartChan = make(chan bool)
    // 需要同步的表
    var syncTables = []string{
        "procedures",
@@ -41,8 +42,12 @@
    agent := serf.InitAgent("apsClient", syncTables, sqlitex.GetDB())
    agent.RegisterClusterEvent(serfClusterEvent)
    go agent.Serve(serfStartChan)
    <-serfStartChan
    if !<-serfStartChan {
        logx.Errorf("serf Init err, exit")
        return
    }
    // 判断当前集群状态
    if agent.ClusterStatus != "slave" {
serf/sync.go
@@ -1,6 +1,7 @@
package serf
import (
    "apsClient/pkg/logx"
    "context"
    "encoding/json"
    "fmt"
@@ -20,6 +21,9 @@
var (
    agent = SyncServer{}
    dependProcs = []string{
        bhomeclient.Proc_System_Service,
    }
)
const (
@@ -101,6 +105,43 @@
    bhomedbapi.InitDoReq(client.RequestOnly)
    //bhomedbapi.InitLog(logger.Debug)
    // 需要等待system-service进程成功启动后,才能获取集群状态(或者保证程序启动时获取到正确的状态)
    tryTimes := 0
loop:
    for {
        select {
        case <-q:
            initChan <- false
            return
        default:
            if tryTimes < 15 {
                clients, err := client.GetRegisteredClient()
                if err == nil && len(clients) > 0 {
                    var existingProcs []string
                    for _, c := range clients {
                        if c.Online {
                            existingProcs = append(existingProcs, string(c.Proc.ProcId))
                        }
                    }
                    if diff := arrayContains(existingProcs, dependProcs); diff == "" {
                        initChan <- true
                        break loop
                    } else {
                        logx.Errorf("Proc: %s is not running!", diff)
                        time.Sleep(time.Second * 1)
                    }
                } else {
                    tryTimes++
                    time.Sleep(time.Second * 5)
                }
            } else {
                logx.Errorf("tried 15 times, client.GetRegisteredClient failed")
                initChan <- false
                return
            }
        }
    }
    go client.StartServer(nil)
    ss.bhClient = client
@@ -157,7 +198,7 @@
        return err
    }
    fmt.Println("加入集群, 请求同步全量数据,id:", ss.ServerId)
    logx.Debugf("加入集群, 请求同步全量数据,id:%s", ss.ServerId)
    return ss.bhClient.Publish(serfSyncTopic, b)
}
@@ -177,7 +218,7 @@
            // 处理同步全量数据的请求
            if string(busMsg.Topic) == ss.queryTableTopic {
                if ss.ClusterStatus == "master" {
                    fmt.Println("接收到同步全量数据请求")
                    logx.Debugf("接收到同步全量数据请求.")
                    ss.handleSyncTableMessage(busMsg.Data)
                }
            }
@@ -248,7 +289,7 @@
        err = tx.Exec(delSql).Error
        if err != nil {
            fmt.Println("删除本地的同步库数据失败,", err.Error())
            logx.Errorf("删除本地的同步库数据失败, %s", err.Error())
        }
    }
@@ -285,7 +326,7 @@
    ss.ClusterStatus = reply.Msg
    fmt.Println("当前集群状态:", ss.ClusterStatus)
    logx.Debugf("当前集群状态: %s", ss.ClusterStatus)
    return reply.Msg
}
@@ -364,7 +405,7 @@
func (ss *SyncServer) handleSyncTableMessage(msg []byte) error {
    targetId := string(msg)
    fmt.Println("同步全量数据给节点:", targetId)
    //fmt.Println("同步全量数据给节点:", targetId)
    sqls, err := DumpTables(ss.sqlDB, ss.syncTables)
    if err != nil {
        fmt.Println("DumpTables error, ", err.Error())
@@ -376,3 +417,25 @@
    return err
}
func arrayContains(list []string, arr []string) string {
    if arr == nil || list == nil {
        return ""
    }
    for _, s := range arr {
        isExist := false
        for _, t := range list {
            if s == t {
                isExist = true
                break
            }
        }
        if !isExist {
            return s
        }
    }
    return ""
}