From 7276ab65576ec73b439a40d7f1a3035a534b968c Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期五, 20 十月 2023 19:12:40 +0800
Subject: [PATCH] 停止消费时关闭tcp连接

---
 serf/sync.go |   20 +++++++++++++++++---
 1 files changed, 17 insertions(+), 3 deletions(-)

diff --git a/serf/sync.go b/serf/sync.go
index ea21710..829d412 100644
--- a/serf/sync.go
+++ b/serf/sync.go
@@ -20,7 +20,7 @@
 )
 
 var (
-	agent = SyncServer{}
+	agent       = SyncServer{}
 	dependProcs = []string{
 		bhomeclient.Proc_System_Service,
 	}
@@ -124,7 +124,6 @@
 						}
 					}
 					if diff := arrayContains(existingProcs, dependProcs); diff == "" {
-						initChan <- true
 						break loop
 					} else {
 						logx.Errorf("Proc: %s is not running!", diff)
@@ -403,8 +402,11 @@
 	}
 }
 
+// serf 鍚屾鏁版嵁鐨勯檺鍒朵负92160 byte
 func (ss *SyncServer) handleSyncTableMessage(msg []byte) error {
+	sizeLimit := 61440
 	targetId := string(msg)
+
 	//fmt.Println("鍚屾鍏ㄩ噺鏁版嵁缁欒妭鐐�:", targetId)
 	sqls, err := DumpTables(ss.sqlDB, ss.syncTables)
 	if err != nil {
@@ -413,7 +415,19 @@
 	}
 
 	syncSql := strings.Join(sqls, ";")
-	err = ss.pubSyncSqlMessage([]byte(syncSql), targetId)
+	if len(syncSql) < sizeLimit {
+		err = ss.pubSyncSqlMessage([]byte(syncSql), targetId)
+	} else {
+		shard := ""
+		for _, sql := range sqls{
+			if len(shard) + len(sql) > sizeLimit {
+				err = ss.pubSyncSqlMessage([]byte(shard), targetId)
+				shard = ""
+			}
+
+			shard = fmt.Sprintf("%s%s;",shard,sql)
+		}
+	}
 
 	return err
 }

--
Gitblit v1.8.0