From 71b8885babe6dfd25c91b007018347c0c1bfac74 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期五, 20 十月 2023 17:35:52 +0800
Subject: [PATCH] 添加主节点变更和主动切换功能

---
 system-service/serf/serf.go |   46 +++++++++++++++++++++++++++++++++-------------
 1 files changed, 33 insertions(+), 13 deletions(-)

diff --git a/system-service/serf/serf.go b/system-service/serf/serf.go
index 58b9af2..77e50c3 100644
--- a/system-service/serf/serf.go
+++ b/system-service/serf/serf.go
@@ -21,13 +21,18 @@
 	UserEventSyncSql                = "SyncSql"
 	UserEventSyncDbTablePersonCache = "SyncCache"
 	UserEventSyncVirtualIp          = "SyncVirtualIp"              //婕傜Щip淇敼
+	UserEventChangeMaster           = "ChangeMaster "              //淇敼鑺傜偣鐘舵��
 	UserEventSyncRegisterInfo       = "SyncRegisterInfo"           //鍚屾娉ㄥ唽淇℃伅
 	DataSystemSerfSubscribe         = "data-system-serf-subscribe" //鍚刟pp浠巗erf璁㈤槄娑堟伅
+	UserEventSyncMessage            = "SyncMessageForProc"         // 涓哄叾浠栬繘绋嬪悓姝ユ秷鎭�
 	TcpTransportPort                = 30194                        //tcp浼犺緭澶ф暟鎹噺鎺ュ彛
+
+	SUserEventSyncMessage
 )
 
 var SyncDbTablePersonCacheChan = make(chan []byte, 512)
 var SyncVirtualIpChan = make(chan []byte, 512)
+var SyncProcMessageChan = make(chan []byte, 512)
 
 func HandleSerfEvent(event serf.Event) {
 	switch ev := event.(type) {
@@ -38,10 +43,15 @@
 			HandleUserEventSyncDbTablePersonCache(ev)
 		} else if ev.Name == UserEventSyncVirtualIp {
 			HandleUserEventSyncVirtualIp(ev)
+		} else if ev.Name == UserEventChangeMaster {
+			HandleUserEventChangeMaster(ev)
 		} else if ev.Name == UserEventSyncRegisterInfo {
 			HandleSyncRegisterInfo(ev)
 		} else if ev.Name == DataSystemSerfSubscribe {
 			HandleDataSystemSerfSub(ev)
+		} else if ev.Name == UserEventSyncMessage {
+			logger.Debug("鎺ユ敹鍒癝yncMessageForProc")
+			HandleUserEventSyncMessage(ev)
 		}
 	case *serf.Query:
 		if ev.Name == QueryEventUpdateDBData {
@@ -54,10 +64,12 @@
 	case serf.MemberEvent:
 		if event.EventType() == serf.EventMemberLeave {
 			HandleEventMemberLeave(ev)
-		} else if event.EventType() == serf.EventMemberJoin {
+		} else if event.EventType() == serf.EventMemberJoin || event.EventType() == serf.EventMemberUpdate {
 			HandleEventMemberJoin(ev)
+		} else if event.EventType() == serf.EventMemberFailed {
+			HandleEventMemberFail(ev)
 		}
-
+		logger.Error("serf MemberEvent ", event.EventType())
 	default:
 		logger.Warn("Unknown event type: %s\n", ev.EventType().String())
 	}
@@ -83,11 +95,11 @@
 					logger.Error("ExecuteSqlByGorm err:", err, ",sql:", sql)
 					return false, err
 				}
-				if result.RowsAffected == 0 {
-					logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql)
-					err = errors.New("ExecuteSqlByGorm RowsAffected == 0")
-					return false, err
-				}
+				//if result.RowsAffected == 0 {
+				//	logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql)
+				//	err = errors.New("ExecuteSqlByGorm RowsAffected == 0")
+				//	return false, err
+				//}
 			}
 			tx.Commit()
 			return true, nil
@@ -100,6 +112,14 @@
 type SqlUserEvent struct {
 	Owner string   `json:"owner"`
 	Sql   []string `json:"sql"`
+}
+
+type ProcMessageEvent struct {
+	Owner   string `json:"owner"`    // 鍙戦�佽��
+	Target  string `json:"target"`   // 鎸囧畾鎺ユ敹鑰�
+	Proc    string `json:"procName"` // 杩涚▼鍚�
+	Topic   string `json:"topic"`    // 涓婚
+	Payload []byte `json:"payload"`  // 娑堟伅浣�,鑷瑙f瀽
 }
 
 type TableDesc struct {
@@ -193,7 +213,7 @@
 	mbs := a.GroupMembers(clusterId)
 	var specmembername string
 	for _, m := range mbs {
-		logger.Info("m", m)
+		logger.Info("member", m)
 		if m.Name != config.Server.AnalyServerId { //鍓嶇紑锛欴SVAD:鍒嗘瀽鏈嶅姟鍣� DSPAD:杩涘嚭鍏ad
 			if strings.HasPrefix(config.Server.AnalyServerId, "DSVAD") {
 				if strings.HasPrefix(m.Name, "DSVAD") {
@@ -206,7 +226,7 @@
 			}
 		}
 	}
-	logger.Info("mbs:", mbs, "specmembername:", specmembername)
+	logger.Info("members:", mbs, "specmembername:", specmembername)
 	if specmembername == "" { //濡傛灉鏈壘鍒扮洰鏍囪妭鐐癸紝璇存槑褰撳墠闆嗙兢鍐呴櫎浜嗘湰鑺傜偣锛屾病鏈夊叾浠栧彲鐢ㄨ妭鐐�
 		return nil, errors.New("specmembername not found")
 	}
@@ -245,8 +265,8 @@
 				logger.Info("Query response's len:", len(msg))
 				err := json.Unmarshal(msg, &dumpSqls)
 				if err == nil {
-					logger.Error("dumpSql:", dumpSqls)
-					logger.Error("data dump success")
+					//logger.Error("dumpSql:", dumpSqls)
+					logger.Debug("data dump success")
 				}
 				return
 			}
@@ -267,7 +287,7 @@
 		return
 	}
 	err = Agent.UserEvent(UserEventSyncSql, ueB, false)
-	if err == nil || !strings.Contains(err.Error(), "cannot contain") {
-		logger.Error("err: ", err)
+	if err != nil {
+		logger.Error("sending sync sql event err: ", err)
 	}
 }

--
Gitblit v1.8.0