From 71b8885babe6dfd25c91b007018347c0c1bfac74 Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期五, 20 十月 2023 17:35:52 +0800 Subject: [PATCH] 添加主节点变更和主动切换功能 --- system-service/serf/serf.go | 46 +++++++++++++++++++++++++++++++++------------- 1 files changed, 33 insertions(+), 13 deletions(-) diff --git a/system-service/serf/serf.go b/system-service/serf/serf.go index 58b9af2..77e50c3 100644 --- a/system-service/serf/serf.go +++ b/system-service/serf/serf.go @@ -21,13 +21,18 @@ UserEventSyncSql = "SyncSql" UserEventSyncDbTablePersonCache = "SyncCache" UserEventSyncVirtualIp = "SyncVirtualIp" //婕傜Щip淇敼 + UserEventChangeMaster = "ChangeMaster " //淇敼鑺傜偣鐘舵�� UserEventSyncRegisterInfo = "SyncRegisterInfo" //鍚屾娉ㄥ唽淇℃伅 DataSystemSerfSubscribe = "data-system-serf-subscribe" //鍚刟pp浠巗erf璁㈤槄娑堟伅 + UserEventSyncMessage = "SyncMessageForProc" // 涓哄叾浠栬繘绋嬪悓姝ユ秷鎭� TcpTransportPort = 30194 //tcp浼犺緭澶ф暟鎹噺鎺ュ彛 + + SUserEventSyncMessage ) var SyncDbTablePersonCacheChan = make(chan []byte, 512) var SyncVirtualIpChan = make(chan []byte, 512) +var SyncProcMessageChan = make(chan []byte, 512) func HandleSerfEvent(event serf.Event) { switch ev := event.(type) { @@ -38,10 +43,15 @@ HandleUserEventSyncDbTablePersonCache(ev) } else if ev.Name == UserEventSyncVirtualIp { HandleUserEventSyncVirtualIp(ev) + } else if ev.Name == UserEventChangeMaster { + HandleUserEventChangeMaster(ev) } else if ev.Name == UserEventSyncRegisterInfo { HandleSyncRegisterInfo(ev) } else if ev.Name == DataSystemSerfSubscribe { HandleDataSystemSerfSub(ev) + } else if ev.Name == UserEventSyncMessage { + logger.Debug("鎺ユ敹鍒癝yncMessageForProc") + HandleUserEventSyncMessage(ev) } case *serf.Query: if ev.Name == QueryEventUpdateDBData { @@ -54,10 +64,12 @@ case serf.MemberEvent: if event.EventType() == serf.EventMemberLeave { HandleEventMemberLeave(ev) - } else if event.EventType() == serf.EventMemberJoin { + } else if event.EventType() == serf.EventMemberJoin || event.EventType() == serf.EventMemberUpdate { HandleEventMemberJoin(ev) + } else if event.EventType() == serf.EventMemberFailed { + HandleEventMemberFail(ev) } - + logger.Error("serf MemberEvent ", event.EventType()) default: logger.Warn("Unknown event type: %s\n", ev.EventType().String()) } @@ -83,11 +95,11 @@ logger.Error("ExecuteSqlByGorm err:", err, ",sql:", sql) return false, err } - if result.RowsAffected == 0 { - logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql) - err = errors.New("ExecuteSqlByGorm RowsAffected == 0") - return false, err - } + //if result.RowsAffected == 0 { + // logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql) + // err = errors.New("ExecuteSqlByGorm RowsAffected == 0") + // return false, err + //} } tx.Commit() return true, nil @@ -100,6 +112,14 @@ type SqlUserEvent struct { Owner string `json:"owner"` Sql []string `json:"sql"` +} + +type ProcMessageEvent struct { + Owner string `json:"owner"` // 鍙戦�佽�� + Target string `json:"target"` // 鎸囧畾鎺ユ敹鑰� + Proc string `json:"procName"` // 杩涚▼鍚� + Topic string `json:"topic"` // 涓婚 + Payload []byte `json:"payload"` // 娑堟伅浣�,鑷瑙f瀽 } type TableDesc struct { @@ -193,7 +213,7 @@ mbs := a.GroupMembers(clusterId) var specmembername string for _, m := range mbs { - logger.Info("m", m) + logger.Info("member", m) if m.Name != config.Server.AnalyServerId { //鍓嶇紑锛欴SVAD:鍒嗘瀽鏈嶅姟鍣� DSPAD:杩涘嚭鍏ad if strings.HasPrefix(config.Server.AnalyServerId, "DSVAD") { if strings.HasPrefix(m.Name, "DSVAD") { @@ -206,7 +226,7 @@ } } } - logger.Info("mbs:", mbs, "specmembername:", specmembername) + logger.Info("members:", mbs, "specmembername:", specmembername) if specmembername == "" { //濡傛灉鏈壘鍒扮洰鏍囪妭鐐癸紝璇存槑褰撳墠闆嗙兢鍐呴櫎浜嗘湰鑺傜偣锛屾病鏈夊叾浠栧彲鐢ㄨ妭鐐� return nil, errors.New("specmembername not found") } @@ -245,8 +265,8 @@ logger.Info("Query response's len:", len(msg)) err := json.Unmarshal(msg, &dumpSqls) if err == nil { - logger.Error("dumpSql:", dumpSqls) - logger.Error("data dump success") + //logger.Error("dumpSql:", dumpSqls) + logger.Debug("data dump success") } return } @@ -267,7 +287,7 @@ return } err = Agent.UserEvent(UserEventSyncSql, ueB, false) - if err == nil || !strings.Contains(err.Error(), "cannot contain") { - logger.Error("err: ", err) + if err != nil { + logger.Error("sending sync sql event err: ", err) } } -- Gitblit v1.8.0