zhangqian
2023-10-27 d91a802e7aa2ad4075ed803b8ddc7536a91a0ef0
serf/sync.go
@@ -20,7 +20,7 @@
)
var (
   agent = SyncServer{}
   agent       = SyncServer{}
   dependProcs = []string{
      bhomeclient.Proc_System_Service,
   }
@@ -124,7 +124,6 @@
                  }
               }
               if diff := arrayContains(existingProcs, dependProcs); diff == "" {
                  initChan <- true
                  break loop
               } else {
                  logx.Errorf("Proc: %s is not running!", diff)
@@ -372,7 +371,7 @@
}
func (ss *SyncServer) handleClusterMessage(msg []byte) {
   //fmt.Println("clusterMessage:", string(msg))
   logx.Infof("clusterMessage:", string(msg))
   sql := string(msg)
   if len(sql) <= 0 {
@@ -403,17 +402,37 @@
   }
}
// serf 同步数据的限制为92160 byte
func (ss *SyncServer) handleSyncTableMessage(msg []byte) error {
   sizeLimit := 61440
   targetId := string(msg)
   //fmt.Println("同步全量数据给节点:", targetId)
   sqls, err := DumpTables(ss.sqlDB, ss.syncTables)
   if err != nil {
      fmt.Println("DumpTables error, ", err.Error())
      logx.Errorf("DumpTables error: %s", err.Error())
      return err
   }
   logx.Infof("DumpTables sql:%v", sqls)
   syncSql := strings.Join(sqls, ";")
   err = ss.pubSyncSqlMessage([]byte(syncSql), targetId)
   if len(syncSql) < sizeLimit {
      err = ss.pubSyncSqlMessage([]byte(syncSql), targetId)
   } else {
      shard := ""
      for _, sql := range sqls {
         if len(shard)+len(sql) > sizeLimit {
            err = ss.pubSyncSqlMessage([]byte(shard), targetId)
            shard = ""
         }
         shard = fmt.Sprintf("%s%s;", shard, sql)
      }
      if len(shard) >0 {
         err = ss.pubSyncSqlMessage([]byte(shard), targetId)
      }
   }
   return err
}