From 71b8885babe6dfd25c91b007018347c0c1bfac74 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期五, 20 十月 2023 17:35:52 +0800
Subject: [PATCH] 添加主节点变更和主动切换功能

---
 system-service/serf/handler.go |  139 +++++++++++++++++++++++++++++++++++++---------
 1 files changed, 111 insertions(+), 28 deletions(-)

diff --git a/system-service/serf/handler.go b/system-service/serf/handler.go
index 0e4566a..a90d4ba 100644
--- a/system-service/serf/handler.go
+++ b/system-service/serf/handler.go
@@ -80,6 +80,44 @@
 	logger.Info("LTime:", ev.LTime, " Recevie virtualIp change")
 	SyncVirtualIpChan <- ev.Payload
 }
+func HandleUserEventChangeMaster(ev serf.UserEvent) {
+	masterId := string(ev.Payload)
+	localId := Agent.LocalMember().Name
+	logger.Info("鍙樹负涓昏妭鐐圭殑id,", masterId, "鏈妭鐐筰d:", localId)
+	if masterId == localId {
+		return
+	}
+
+	var clusterDb models.Node
+	localNode, err := clusterDb.FindNodeById(localId)
+	if err == nil {
+		if localNode.DriftState == "master" {
+			logger.Info("鏈妭鐐逛负涔嬪墠鐨刴aster,閫氱煡鍙樻洿涓簊lave")
+
+			// 閫氱煡涓昏妭鐐瑰彉鏇�
+			chMsg := protomsg.DbChangeMessage{
+				Id:     localNode.ClusterId,
+				Table:  protomsg.TableChanged_T_Cluster,
+				Action: protomsg.DbAction_Insert,
+				Info:   "master2slave",
+			}
+
+			bts, _ := json.Marshal(chMsg)
+			h := GetBusHandle()
+			if h == nil {
+				logger.Error("HandleEventMemberLeave bus handle is nil")
+				return
+			}
+			err := h.Publish("system-service", bts)
+			if err != nil {
+				logger.Error("HandleEventMemberLeave pub master err:", err)
+			}
+		}
+	}
+
+	clusterDb.UpdateDriftStateByNodeId("master", masterId, false)
+}
+
 func HandleUserEventSyncMessage(ev serf.UserEvent) {
 	logger.Info("receive a UserEventSyncMessage event")
 	var procMsg ProcMessageEvent
@@ -242,47 +280,46 @@
 func HandleEventMemberLeave(ev serf.MemberEvent) {
 	if ev.Members != nil && len(ev.Members) == 1 {
 		leaveMember := ev.Members[0]
+		logger.Info("Event Member Leave, Members:", ev.Members)
 
 		leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'"
 		flag, e := executeSqlByGorm([]string{leaveSql})
 
 		// 鍒ゆ柇绂诲紑鏄惁鏄富鑺傜偣绂诲紑, 鏇存崲涓昏妭鐐�
 		var clusterDb models.Node
-		clusterNodes, err := clusterDb.FindNodes()
+		leaveNode, err := clusterDb.FindNodeById(leaveMember.Name)
 		if err == nil {
-			for _, node := range clusterNodes {
-				//logger.Info(node.Id, node.DriftState, leaveMember.Name)
-				if node.NodeId == leaveMember.Name && node.DriftState == "master" {
-					firstNode, _ := clusterDb.FindFirstNode()
-					//logger.Info("firstNode:", firstNode.NodeId, "local:", Agent.LocalMember().Name)
-					if firstNode.NodeId == Agent.LocalMember().Name {
-						//logger.Info("update master")
-						clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId)
+			logger.Info("鏌ヨ绂诲紑鑺傜偣鐨勪俊鎭�,", leaveNode)
+			if leaveNode.DriftState == "master" {
+				firstNode, _ := clusterDb.FindFirstNode()
+				logger.Info("绂诲紑鐨勮妭鐐逛负涓昏妭鐐�, 鏌ヨ鍔犲叆鏈�鏃╃殑鑺傜偣, ", firstNode)
+				logger.Info("鏈妭鐐逛俊鎭�:", Agent.LocalMember().Name)
+				if firstNode.NodeId == Agent.LocalMember().Name {
+					logger.Info("鏇存柊鏈妭鐐逛负涓昏妭鐐�")
+					clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId, true)
 
-						// 閫氱煡涓昏妭鐐瑰彉鏇�
-						chMsg := protomsg.DbChangeMessage{
-							Id:     firstNode.ClusterId,
-							Table:  protomsg.TableChanged_T_Cluster,
-							Action: protomsg.DbAction_Insert,
-							Info:   "slave2master",
-						}
+					// 閫氱煡涓昏妭鐐瑰彉鏇�
+					chMsg := protomsg.DbChangeMessage{
+						Id:     firstNode.ClusterId,
+						Table:  protomsg.TableChanged_T_Cluster,
+						Action: protomsg.DbAction_Insert,
+						Info:   "slave2master",
+					}
 
-						bts, _ := json.Marshal(chMsg)
-						h := GetBusHandle()
-						if h == nil {
-							logger.Error("HandleEventMemberLeave bus handle is nil")
-							return
-						}
-						err := h.Publish("system-service", bts)
-						if err != nil {
-							logger.Error("HandleEventMemberLeave pub master err:", err)
-						}
+					bts, _ := json.Marshal(chMsg)
+					h := GetBusHandle()
+					if h == nil {
+						logger.Error("HandleEventMemberLeave bus handle is nil")
+						return
+					}
+					err := h.Publish("system-service", bts)
+					if err != nil {
+						logger.Error("HandleEventMemberLeave pub master err:", err)
 					}
 				}
 			}
 		}
 
-		logger.Info("EventMemberLeave,current Members:", ev.Members)
 		logLT := ""
 		logT := time.Now().Format("2006-01-02 15:04:05")
 		logSql := strings.ReplaceAll(leaveSql, "'", "''")
@@ -305,8 +342,35 @@
 		timeUnix := time.Now().Unix()
 		fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
 
-		//joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
+		// 鏇存柊鍦ㄧ嚎鐘舵��
+		updateOnlineSql := fmt.Sprintf("update cluster_node set online=1 where node_id='%s'", joinMember.Name)
+
+		// 濡傛灉鏄柊鍔犲叆鐨勬洿鏂板垱寤烘椂闂�
 		joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s' and isDelete=1", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name)
+		flag, e := executeSqlByGorm([]string{updateOnlineSql, joinSql})
+
+		logger.Info("EventMemberJoin,current Members:", ev.Members)
+		logLT := ""
+		logT := time.Now().Format("2006-01-02 15:04:05")
+		logSql := strings.ReplaceAll(joinSql, "'", "''")
+		logResult := "0"
+		if flag {
+			logResult = "1"
+		}
+		logErr := ""
+		if e != nil {
+			logErr = e.Error()
+		}
+		executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"})
+	}
+}
+
+func HandleEventMemberFail(ev serf.MemberEvent) {
+	if ev.Members != nil && len(ev.Members) == 1 {
+		joinMember := ev.Members[0]
+
+		//joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
+		joinSql := fmt.Sprintf("update cluster_node set online=0 where node_id='%s'", joinMember.Name)
 		flag, e := executeSqlByGorm([]string{joinSql})
 
 		logger.Info("EventMemberJoin,current Members:", ev.Members)
@@ -324,3 +388,22 @@
 		executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"})
 	}
 }
+
+func HandleUpdateMemberStatus() {
+	if Agent == nil {
+		return
+	}
+
+	for _, member := range Agent.Serf().Members() {
+		alive := 0
+		if member.Status == serf.StatusAlive {
+			alive = 1
+		}
+
+		sql := fmt.Sprintf("update cluster_node set online=%d where node_id='%s'", alive, member.Name)
+		_, err := executeSqlByGorm([]string{sql})
+		if err != nil {
+			logger.Error(err)
+		}
+	}
+}

--
Gitblit v1.8.0