From 2cd1af13bc4e7aec4c85b9fe88db2d294af6468f Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期日, 08 十月 2023 11:24:37 +0800 Subject: [PATCH] 修复集群同步功能 --- system-service/serf/serf.go | 23 +++++++++++++++++++---- 1 files changed, 19 insertions(+), 4 deletions(-) diff --git a/system-service/serf/serf.go b/system-service/serf/serf.go index 55ecb72..0a14a19 100644 --- a/system-service/serf/serf.go +++ b/system-service/serf/serf.go @@ -23,11 +23,15 @@ UserEventSyncVirtualIp = "SyncVirtualIp" //婕傜Щip淇敼 UserEventSyncRegisterInfo = "SyncRegisterInfo" //鍚屾娉ㄥ唽淇℃伅 DataSystemSerfSubscribe = "data-system-serf-subscribe" //鍚刟pp浠巗erf璁㈤槄娑堟伅 + UserEventSyncMessage = "SyncMessageForProc" // 涓哄叾浠栬繘绋嬪悓姝ユ秷鎭� TcpTransportPort = 30194 //tcp浼犺緭澶ф暟鎹噺鎺ュ彛 + + SUserEventSyncMessage ) var SyncDbTablePersonCacheChan = make(chan []byte, 512) var SyncVirtualIpChan = make(chan []byte, 512) +var SyncProcMessageChan = make(chan []byte, 512) func HandleSerfEvent(event serf.Event) { switch ev := event.(type) { @@ -42,6 +46,9 @@ HandleSyncRegisterInfo(ev) } else if ev.Name == DataSystemSerfSubscribe { HandleDataSystemSerfSub(ev) + } else if ev.Name == UserEventSyncMessage { + logger.Debug("鎺ユ敹鍒癝yncMessageForProc") + HandleUserEventSyncMessage(ev) } case *serf.Query: if ev.Name == QueryEventUpdateDBData { @@ -100,6 +107,14 @@ type SqlUserEvent struct { Owner string `json:"owner"` Sql []string `json:"sql"` +} + +type ProcMessageEvent struct { + Owner string `json:"owner"` // 鍙戦�佽�� + Target string `json:"target"` // 鎸囧畾鎺ユ敹鑰� + Proc string `json:"procName"` // 杩涚▼鍚� + Topic string `json:"topic"` // 涓婚 + Payload []byte `json:"payload"` // 娑堟伅浣�,鑷瑙f瀽 } type TableDesc struct { @@ -193,7 +208,7 @@ mbs := a.GroupMembers(clusterId) var specmembername string for _, m := range mbs { - logger.Info("m", m) + logger.Info("member", m) if m.Name != config.Server.AnalyServerId { //鍓嶇紑锛欴SVAD:鍒嗘瀽鏈嶅姟鍣� DSPAD:杩涘嚭鍏ad if strings.HasPrefix(config.Server.AnalyServerId, "DSVAD") { if strings.HasPrefix(m.Name, "DSVAD") { @@ -206,7 +221,7 @@ } } } - logger.Info("mbs:", mbs, "specmembername:", specmembername) + logger.Info("members:", mbs, "specmembername:", specmembername) if specmembername == "" { //濡傛灉鏈壘鍒扮洰鏍囪妭鐐癸紝璇存槑褰撳墠闆嗙兢鍐呴櫎浜嗘湰鑺傜偣锛屾病鏈夊叾浠栧彲鐢ㄨ妭鐐� return nil, errors.New("specmembername not found") } @@ -245,8 +260,8 @@ logger.Info("Query response's len:", len(msg)) err := json.Unmarshal(msg, &dumpSqls) if err == nil { - logger.Error("dumpSql:", dumpSqls) - logger.Error("data dump success") + //logger.Error("dumpSql:", dumpSqls) + logger.Debug("data dump success") } return } -- Gitblit v1.8.0