From 10c65da3d2af7056f48d9301e83f53f102f76e18 Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期一, 30 十月 2023 14:52:11 +0800 Subject: [PATCH] fix --- serf/sync.go | 116 +++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 103 insertions(+), 13 deletions(-) diff --git a/serf/sync.go b/serf/sync.go index b49eebf..ce3c2fb 100644 --- a/serf/sync.go +++ b/serf/sync.go @@ -1,6 +1,7 @@ package serf import ( + "apsClient/pkg/logx" "context" "encoding/json" "fmt" @@ -19,7 +20,10 @@ ) var ( - agent = SyncServer{} + agent = SyncServer{} + dependProcs = []string{ + bhomeclient.Proc_System_Service, + } ) const ( @@ -101,6 +105,42 @@ bhomedbapi.InitDoReq(client.RequestOnly) //bhomedbapi.InitLog(logger.Debug) + // 闇�瑕佺瓑寰卻ystem-service杩涚▼鎴愬姛鍚姩鍚庯紝鎵嶈兘鑾峰彇闆嗙兢鐘舵��(鎴栬�呬繚璇佺▼搴忓惎鍔ㄦ椂鑾峰彇鍒版纭殑鐘舵��) + tryTimes := 0 +loop: + for { + select { + case <-q: + initChan <- false + return + default: + if tryTimes < 15 { + clients, err := client.GetRegisteredClient() + if err == nil && len(clients) > 0 { + var existingProcs []string + for _, c := range clients { + if c.Online { + existingProcs = append(existingProcs, string(c.Proc.ProcId)) + } + } + if diff := arrayContains(existingProcs, dependProcs); diff == "" { + break loop + } else { + logx.Errorf("Proc: %s is not running!", diff) + time.Sleep(time.Second * 1) + } + } else { + tryTimes++ + time.Sleep(time.Second * 5) + } + } else { + logx.Errorf("tried 15 times, client.GetRegisteredClient failed") + initChan <- false + return + } + } + } + go client.StartServer(nil) ss.bhClient = client @@ -112,9 +152,9 @@ // 鍚姩鍚庢煡璇竴娆¢泦缇ょ姸鎬� ss.QueryClusterStat() - if ss.ClusterStatus != "" { - ss.sqlDB.LogMode(true) - } + //if ss.ClusterStatus != "" { + ss.sqlDB.LogMode(true) + //} initChan <- true <-q @@ -157,7 +197,7 @@ return err } - fmt.Println("鍔犲叆闆嗙兢, 璇锋眰鍚屾鍏ㄩ噺鏁版嵁,id:", ss.ServerId) + logx.Debugf("鍔犲叆闆嗙兢, 璇锋眰鍚屾鍏ㄩ噺鏁版嵁,id:%s", ss.ServerId) return ss.bhClient.Publish(serfSyncTopic, b) } @@ -177,7 +217,7 @@ // 澶勭悊鍚屾鍏ㄩ噺鏁版嵁鐨勮姹� if string(busMsg.Topic) == ss.queryTableTopic { if ss.ClusterStatus == "master" { - fmt.Println("鎺ユ敹鍒板悓姝ュ叏閲忔暟鎹姹�") + logx.Debugf("鎺ユ敹鍒板悓姝ュ叏閲忔暟鎹姹�.") ss.handleSyncTableMessage(busMsg.Data) } } @@ -212,7 +252,15 @@ // 閫�鍑洪泦缇�, 寮�鍚棩蹇楄窡韪�, 璁剧疆瑙掕壊slave ss.clusterEventFn(EventLeaveCluster) ss.ClusterStatus = "" - ss.sqlDB.LogMode(false) + ss.sqlDB.LogMode(true) + case "slave2master": + ss.clusterEventFn(EventSlave2Master) + ss.ClusterStatus = "master" + ss.sqlDB.LogMode(true) + case "master2slave": + ss.clusterEventFn(EventMaster2Slave) + ss.ClusterStatus = "slave" + ss.sqlDB.LogMode(true) } } } @@ -240,7 +288,7 @@ err = tx.Exec(delSql).Error if err != nil { - fmt.Println("鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁澶辫触,", err.Error()) + logx.Errorf("鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁澶辫触, %s", err.Error()) } } @@ -277,7 +325,7 @@ ss.ClusterStatus = reply.Msg - fmt.Println("褰撳墠闆嗙兢鐘舵��:", ss.ClusterStatus) + logx.Debugf("褰撳墠闆嗙兢鐘舵��: %s", ss.ClusterStatus) return reply.Msg } @@ -323,7 +371,7 @@ } func (ss *SyncServer) handleClusterMessage(msg []byte) { - //fmt.Println("clusterMessage:", string(msg)) + logx.Infof("clusterMessage:", string(msg)) sql := string(msg) if len(sql) <= 0 { @@ -354,17 +402,59 @@ } } +// serf 鍚屾鏁版嵁鐨勯檺鍒朵负92160 byte func (ss *SyncServer) handleSyncTableMessage(msg []byte) error { + sizeLimit := 61440 targetId := string(msg) - fmt.Println("鍚屾鍏ㄩ噺鏁版嵁缁欒妭鐐�:", targetId) + + //fmt.Println("鍚屾鍏ㄩ噺鏁版嵁缁欒妭鐐�:", targetId) sqls, err := DumpTables(ss.sqlDB, ss.syncTables) if err != nil { - fmt.Println("DumpTables error, ", err.Error()) + logx.Errorf("DumpTables error: %s", err.Error()) return err } + logx.Infof("DumpTables sql:%v", sqls) syncSql := strings.Join(sqls, ";") - err = ss.pubSyncSqlMessage([]byte(syncSql), targetId) + if len(syncSql) < sizeLimit { + err = ss.pubSyncSqlMessage([]byte(syncSql), targetId) + } else { + shard := "" + for _, sql := range sqls { + if len(shard)+len(sql) > sizeLimit { + err = ss.pubSyncSqlMessage([]byte(shard), targetId) + shard = "" + } + + shard = fmt.Sprintf("%s%s;", shard, sql) + } + + if len(shard) > 0 { + err = ss.pubSyncSqlMessage([]byte(shard), targetId) + } + } return err } + +func arrayContains(list []string, arr []string) string { + if arr == nil || list == nil { + return "" + } + + for _, s := range arr { + isExist := false + for _, t := range list { + if s == t { + isExist = true + break + } + } + + if !isExist { + return s + } + } + + return "" +} -- Gitblit v1.8.0