| | |
| | | package serf |
| | | |
| | | import ( |
| | | "apsClient/pkg/logx" |
| | | "context" |
| | | "encoding/json" |
| | | "fmt" |
| | |
| | | |
| | | var ( |
| | | agent = SyncServer{} |
| | | dependProcs = []string{ |
| | | bhomeclient.Proc_System_Service, |
| | | } |
| | | ) |
| | | |
| | | const ( |
| | |
| | | bhomedbapi.InitDoReq(client.RequestOnly) |
| | | //bhomedbapi.InitLog(logger.Debug) |
| | | |
| | | // 需要等待system-service进程成功启动后,才能获取集群状态(或者保证程序启动时获取到正确的状态) |
| | | tryTimes := 0 |
| | | loop: |
| | | for { |
| | | select { |
| | | case <-q: |
| | | initChan <- false |
| | | return |
| | | default: |
| | | if tryTimes < 15 { |
| | | clients, err := client.GetRegisteredClient() |
| | | if err == nil && len(clients) > 0 { |
| | | var existingProcs []string |
| | | for _, c := range clients { |
| | | if c.Online { |
| | | existingProcs = append(existingProcs, string(c.Proc.ProcId)) |
| | | } |
| | | } |
| | | if diff := arrayContains(existingProcs, dependProcs); diff == "" { |
| | | initChan <- true |
| | | break loop |
| | | } else { |
| | | logx.Errorf("Proc: %s is not running!", diff) |
| | | time.Sleep(time.Second * 1) |
| | | } |
| | | } else { |
| | | tryTimes++ |
| | | time.Sleep(time.Second * 5) |
| | | } |
| | | } else { |
| | | logx.Errorf("tried 15 times, client.GetRegisteredClient failed") |
| | | initChan <- false |
| | | return |
| | | } |
| | | } |
| | | } |
| | | |
| | | go client.StartServer(nil) |
| | | |
| | | ss.bhClient = client |
| | |
| | | return err |
| | | } |
| | | |
| | | fmt.Println("加入集群, 请求同步全量数据,id:", ss.ServerId) |
| | | logx.Debugf("加入集群, 请求同步全量数据,id:%s", ss.ServerId) |
| | | return ss.bhClient.Publish(serfSyncTopic, b) |
| | | } |
| | | |
| | |
| | | // 处理同步全量数据的请求 |
| | | if string(busMsg.Topic) == ss.queryTableTopic { |
| | | if ss.ClusterStatus == "master" { |
| | | fmt.Println("接收到同步全量数据请求") |
| | | logx.Debugf("接收到同步全量数据请求.") |
| | | ss.handleSyncTableMessage(busMsg.Data) |
| | | } |
| | | } |
| | |
| | | |
| | | err = tx.Exec(delSql).Error |
| | | if err != nil { |
| | | fmt.Println("删除本地的同步库数据失败,", err.Error()) |
| | | logx.Errorf("删除本地的同步库数据失败, %s", err.Error()) |
| | | } |
| | | } |
| | | |
| | |
| | | |
| | | ss.ClusterStatus = reply.Msg |
| | | |
| | | fmt.Println("当前集群状态:", ss.ClusterStatus) |
| | | logx.Debugf("当前集群状态: %s", ss.ClusterStatus) |
| | | |
| | | return reply.Msg |
| | | } |
| | |
| | | |
| | | func (ss *SyncServer) handleSyncTableMessage(msg []byte) error { |
| | | targetId := string(msg) |
| | | fmt.Println("同步全量数据给节点:", targetId) |
| | | //fmt.Println("同步全量数据给节点:", targetId) |
| | | sqls, err := DumpTables(ss.sqlDB, ss.syncTables) |
| | | if err != nil { |
| | | fmt.Println("DumpTables error, ", err.Error()) |
| | |
| | | |
| | | return err |
| | | } |
| | | |
| | | func arrayContains(list []string, arr []string) string { |
| | | if arr == nil || list == nil { |
| | | return "" |
| | | } |
| | | |
| | | for _, s := range arr { |
| | | isExist := false |
| | | for _, t := range list { |
| | | if s == t { |
| | | isExist = true |
| | | break |
| | | } |
| | | } |
| | | |
| | | if !isExist { |
| | | return s |
| | | } |
| | | } |
| | | |
| | | return "" |
| | | } |