From 7276ab65576ec73b439a40d7f1a3035a534b968c Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期五, 20 十月 2023 19:12:40 +0800 Subject: [PATCH] 停止消费时关闭tcp连接 --- serf/sync.go | 20 +++++++++++++++++--- 1 files changed, 17 insertions(+), 3 deletions(-) diff --git a/serf/sync.go b/serf/sync.go index ea21710..829d412 100644 --- a/serf/sync.go +++ b/serf/sync.go @@ -20,7 +20,7 @@ ) var ( - agent = SyncServer{} + agent = SyncServer{} dependProcs = []string{ bhomeclient.Proc_System_Service, } @@ -124,7 +124,6 @@ } } if diff := arrayContains(existingProcs, dependProcs); diff == "" { - initChan <- true break loop } else { logx.Errorf("Proc: %s is not running!", diff) @@ -403,8 +402,11 @@ } } +// serf 鍚屾鏁版嵁鐨勯檺鍒朵负92160 byte func (ss *SyncServer) handleSyncTableMessage(msg []byte) error { + sizeLimit := 61440 targetId := string(msg) + //fmt.Println("鍚屾鍏ㄩ噺鏁版嵁缁欒妭鐐�:", targetId) sqls, err := DumpTables(ss.sqlDB, ss.syncTables) if err != nil { @@ -413,7 +415,19 @@ } syncSql := strings.Join(sqls, ";") - err = ss.pubSyncSqlMessage([]byte(syncSql), targetId) + if len(syncSql) < sizeLimit { + err = ss.pubSyncSqlMessage([]byte(syncSql), targetId) + } else { + shard := "" + for _, sql := range sqls{ + if len(shard) + len(sql) > sizeLimit { + err = ss.pubSyncSqlMessage([]byte(shard), targetId) + shard = "" + } + + shard = fmt.Sprintf("%s%s;",shard,sql) + } + } return err } -- Gitblit v1.8.0