From 578b74f9de4b96e88e2fddb726c7c6f78162b033 Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期六, 21 十月 2023 14:34:23 +0800
Subject: [PATCH] 启停生产者
---
serf/sync.go | 20 +++++++++++++++++---
1 files changed, 17 insertions(+), 3 deletions(-)
diff --git a/serf/sync.go b/serf/sync.go
index ea21710..829d412 100644
--- a/serf/sync.go
+++ b/serf/sync.go
@@ -20,7 +20,7 @@
)
var (
- agent = SyncServer{}
+ agent = SyncServer{}
dependProcs = []string{
bhomeclient.Proc_System_Service,
}
@@ -124,7 +124,6 @@
}
}
if diff := arrayContains(existingProcs, dependProcs); diff == "" {
- initChan <- true
break loop
} else {
logx.Errorf("Proc: %s is not running!", diff)
@@ -403,8 +402,11 @@
}
}
+// serf 鍚屾鏁版嵁鐨勯檺鍒朵负92160 byte
func (ss *SyncServer) handleSyncTableMessage(msg []byte) error {
+ sizeLimit := 61440
targetId := string(msg)
+
//fmt.Println("鍚屾鍏ㄩ噺鏁版嵁缁欒妭鐐�:", targetId)
sqls, err := DumpTables(ss.sqlDB, ss.syncTables)
if err != nil {
@@ -413,7 +415,19 @@
}
syncSql := strings.Join(sqls, ";")
- err = ss.pubSyncSqlMessage([]byte(syncSql), targetId)
+ if len(syncSql) < sizeLimit {
+ err = ss.pubSyncSqlMessage([]byte(syncSql), targetId)
+ } else {
+ shard := ""
+ for _, sql := range sqls{
+ if len(shard) + len(sql) > sizeLimit {
+ err = ss.pubSyncSqlMessage([]byte(shard), targetId)
+ shard = ""
+ }
+
+ shard = fmt.Sprintf("%s%s;",shard,sql)
+ }
+ }
return err
}
--
Gitblit v1.8.0