From cc6d3763795b62d353df0c1eb58cbb736b709829 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期五, 20 十月 2023 09:48:37 +0800
Subject: [PATCH] 添加主节点退出时, 重新选举master的功能

---
 system-service/service/clusterService.go |    2 
 system-service/models/node.go            |   16 +++++++
 system-service/serf/handler.go           |   43 ++++++++++++++++++++-
 3 files changed, 57 insertions(+), 4 deletions(-)

diff --git a/system-service/models/node.go b/system-service/models/node.go
index 24d8af4..5e1b046 100644
--- a/system-service/models/node.go
+++ b/system-service/models/node.go
@@ -43,6 +43,20 @@
 	return nodes, nil
 }
 
+func (n *Node) FindNodes() (nodes []Node, err error) {
+	if err = db.Raw("select * from cluster_node").Scan(&nodes).Error; err != nil {
+		return nil, err
+	}
+	return nodes, nil
+}
+
+func (n *Node) FindFirstNode() (node Node, err error) {
+	if err = db.Raw("select * from cluster_node where isDelete=0 order by create_time limit 1").Scan(&node).Error; err != nil {
+		return node, err
+	}
+	return node, nil
+}
+
 func (n *Node) UpdateDriftStateByNodeId(driftState string, nodeId string) bool {
 	var err error
 	tx := GetDB().Begin()
@@ -56,7 +70,7 @@
 		if err = tx.Exec("update "+n.TableName()+" set drift_state=? where id=?", driftState, nodeId).Error; err != nil {
 			return false
 		}
-		if err = tx.Exec("update "+n.TableName()+" set drift_state='backup' where id !=?", nodeId).Error; err != nil {
+		if err = tx.Exec("update "+n.TableName()+" set drift_state='slave' where id !=?", nodeId).Error; err != nil {
 			return false
 		}
 	} else {
diff --git a/system-service/serf/handler.go b/system-service/serf/handler.go
index c9689a3..0e4566a 100644
--- a/system-service/serf/handler.go
+++ b/system-service/serf/handler.go
@@ -1,6 +1,7 @@
 package serf
 
 import (
+	"basic.com/pubsub/protomsg.git"
 	"basic.com/valib/logger.git"
 	"basic.com/valib/serf.git/serf"
 	"encoding/json"
@@ -16,6 +17,7 @@
 	"time"
 	"vamicro/config"
 	"vamicro/system-service/bhome_msg_dev"
+	"vamicro/system-service/models"
 )
 
 type RpcHandle func(arg RpcParamTopic) ([]serf.NodeResponse, error)
@@ -240,8 +242,45 @@
 func HandleEventMemberLeave(ev serf.MemberEvent) {
 	if ev.Members != nil && len(ev.Members) == 1 {
 		leaveMember := ev.Members[0]
-		leaveSql := "update cluster_node set isDelete=1,drift_state='' where node_id='" + leaveMember.Name + "'"
+
+		leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'"
 		flag, e := executeSqlByGorm([]string{leaveSql})
+
+		// 鍒ゆ柇绂诲紑鏄惁鏄富鑺傜偣绂诲紑, 鏇存崲涓昏妭鐐�
+		var clusterDb models.Node
+		clusterNodes, err := clusterDb.FindNodes()
+		if err == nil {
+			for _, node := range clusterNodes {
+				//logger.Info(node.Id, node.DriftState, leaveMember.Name)
+				if node.NodeId == leaveMember.Name && node.DriftState == "master" {
+					firstNode, _ := clusterDb.FindFirstNode()
+					//logger.Info("firstNode:", firstNode.NodeId, "local:", Agent.LocalMember().Name)
+					if firstNode.NodeId == Agent.LocalMember().Name {
+						//logger.Info("update master")
+						clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId)
+
+						// 閫氱煡涓昏妭鐐瑰彉鏇�
+						chMsg := protomsg.DbChangeMessage{
+							Id:     firstNode.ClusterId,
+							Table:  protomsg.TableChanged_T_Cluster,
+							Action: protomsg.DbAction_Insert,
+							Info:   "slave2master",
+						}
+
+						bts, _ := json.Marshal(chMsg)
+						h := GetBusHandle()
+						if h == nil {
+							logger.Error("HandleEventMemberLeave bus handle is nil")
+							return
+						}
+						err := h.Publish("system-service", bts)
+						if err != nil {
+							logger.Error("HandleEventMemberLeave pub master err:", err)
+						}
+					}
+				}
+			}
+		}
 
 		logger.Info("EventMemberLeave,current Members:", ev.Members)
 		logLT := ""
@@ -267,7 +306,7 @@
 		fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
 
 		//joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
-		joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='',node_ip='%s',create_time='%s' where node_id='%s'", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name)
+		joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s' and isDelete=1", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name)
 		flag, e := executeSqlByGorm([]string{joinSql})
 
 		logger.Info("EventMemberJoin,current Members:", ev.Members)
diff --git a/system-service/service/clusterService.go b/system-service/service/clusterService.go
index feffd61..288683f 100644
--- a/system-service/service/clusterService.go
+++ b/system-service/service/clusterService.go
@@ -444,7 +444,7 @@
 		return false
 	}
 
-	joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId)
+	joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId)
 	if err = tx.Exec(joinSql).Error; err != nil {
 		logger.Debug("update isDelete err:", err)
 		return false

--
Gitblit v1.8.0