zhangzengfei
2023-10-19 f73f610cdf4b0666dc139a51e72353b88f8f25ab
serf/sync.go
@@ -1,6 +1,7 @@
package serf
import (
   "apsClient/pkg/logx"
   "context"
   "encoding/json"
   "fmt"
@@ -20,6 +21,9 @@
var (
   agent = SyncServer{}
   dependProcs = []string{
      bhomeclient.Proc_System_Service,
   }
)
const (
@@ -101,6 +105,43 @@
   bhomedbapi.InitDoReq(client.RequestOnly)
   //bhomedbapi.InitLog(logger.Debug)
   // 需要等待system-service进程成功启动后,才能获取集群状态(或者保证程序启动时获取到正确的状态)
   tryTimes := 0
loop:
   for {
      select {
      case <-q:
         initChan <- false
         return
      default:
         if tryTimes < 15 {
            clients, err := client.GetRegisteredClient()
            if err == nil && len(clients) > 0 {
               var existingProcs []string
               for _, c := range clients {
                  if c.Online {
                     existingProcs = append(existingProcs, string(c.Proc.ProcId))
                  }
               }
               if diff := arrayContains(existingProcs, dependProcs); diff == "" {
                  initChan <- true
                  break loop
               } else {
                  logx.Errorf("Proc: %s is not running!", diff)
                  time.Sleep(time.Second * 1)
               }
            } else {
               tryTimes++
               time.Sleep(time.Second * 5)
            }
         } else {
            logx.Errorf("tried 15 times, client.GetRegisteredClient failed")
            initChan <- false
            return
         }
      }
   }
   go client.StartServer(nil)
   ss.bhClient = client
@@ -157,7 +198,7 @@
      return err
   }
   fmt.Println("加入集群, 请求同步全量数据,id:", ss.ServerId)
   logx.Debugf("加入集群, 请求同步全量数据,id:%s", ss.ServerId)
   return ss.bhClient.Publish(serfSyncTopic, b)
}
@@ -177,7 +218,7 @@
         // 处理同步全量数据的请求
         if string(busMsg.Topic) == ss.queryTableTopic {
            if ss.ClusterStatus == "master" {
               fmt.Println("接收到同步全量数据请求")
               logx.Debugf("接收到同步全量数据请求.")
               ss.handleSyncTableMessage(busMsg.Data)
            }
         }
@@ -248,7 +289,7 @@
      err = tx.Exec(delSql).Error
      if err != nil {
         fmt.Println("删除本地的同步库数据失败,", err.Error())
         logx.Errorf("删除本地的同步库数据失败, %s", err.Error())
      }
   }
@@ -285,7 +326,7 @@
   ss.ClusterStatus = reply.Msg
   fmt.Println("当前集群状态:", ss.ClusterStatus)
   logx.Debugf("当前集群状态: %s", ss.ClusterStatus)
   return reply.Msg
}
@@ -364,7 +405,7 @@
func (ss *SyncServer) handleSyncTableMessage(msg []byte) error {
   targetId := string(msg)
   fmt.Println("同步全量数据给节点:", targetId)
   //fmt.Println("同步全量数据给节点:", targetId)
   sqls, err := DumpTables(ss.sqlDB, ss.syncTables)
   if err != nil {
      fmt.Println("DumpTables error, ", err.Error())
@@ -376,3 +417,25 @@
   return err
}
func arrayContains(list []string, arr []string) string {
   if arr == nil || list == nil {
      return ""
   }
   for _, s := range arr {
      isExist := false
      for _, t := range list {
         if s == t {
            isExist = true
            break
         }
      }
      if !isExist {
         return s
      }
   }
   return ""
}