| | |
| | | } |
| | | |
| | | func (ss *SyncServer) handleClusterMessage(msg []byte) { |
| | | //fmt.Println("clusterMessage:", string(msg)) |
| | | logx.Infof("clusterMessage:", string(msg)) |
| | | sql := string(msg) |
| | | |
| | | if len(sql) <= 0 { |
| | |
| | | } |
| | | } |
| | | |
| | | // serf 同步数据的限制为92160 byte |
| | | func (ss *SyncServer) handleSyncTableMessage(msg []byte) error { |
| | | sizeLimit := 61440 |
| | | targetId := string(msg) |
| | | |
| | | //fmt.Println("同步全量数据给节点:", targetId) |
| | | sqls, err := DumpTables(ss.sqlDB, ss.syncTables) |
| | | if err != nil { |
| | | fmt.Println("DumpTables error, ", err.Error()) |
| | | logx.Errorf("DumpTables error: %s", err.Error()) |
| | | return err |
| | | } |
| | | |
| | | logx.Infof("DumpTables sql:%v", sqls) |
| | | syncSql := strings.Join(sqls, ";") |
| | | err = ss.pubSyncSqlMessage([]byte(syncSql), targetId) |
| | | if len(syncSql) < sizeLimit { |
| | | err = ss.pubSyncSqlMessage([]byte(syncSql), targetId) |
| | | } else { |
| | | shard := "" |
| | | for _, sql := range sqls { |
| | | if len(shard)+len(sql) > sizeLimit { |
| | | err = ss.pubSyncSqlMessage([]byte(shard), targetId) |
| | | shard = "" |
| | | } |
| | | |
| | | shard = fmt.Sprintf("%s%s;", shard, sql) |
| | | } |
| | | |
| | | if len(shard) >0 { |
| | | err = ss.pubSyncSqlMessage([]byte(shard), targetId) |
| | | } |
| | | } |
| | | |
| | | return err |
| | | } |