From cc6d3763795b62d353df0c1eb58cbb736b709829 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期五, 20 十月 2023 09:48:37 +0800
Subject: [PATCH] 添加主节点退出时, 重新选举master的功能
---
system-service/service/clusterService.go | 2
system-service/models/node.go | 16 +++++++
system-service/serf/handler.go | 43 ++++++++++++++++++++-
3 files changed, 57 insertions(+), 4 deletions(-)
diff --git a/system-service/models/node.go b/system-service/models/node.go
index 24d8af4..5e1b046 100644
--- a/system-service/models/node.go
+++ b/system-service/models/node.go
@@ -43,6 +43,20 @@
return nodes, nil
}
+func (n *Node) FindNodes() (nodes []Node, err error) {
+ if err = db.Raw("select * from cluster_node").Scan(&nodes).Error; err != nil {
+ return nil, err
+ }
+ return nodes, nil
+}
+
+func (n *Node) FindFirstNode() (node Node, err error) {
+ if err = db.Raw("select * from cluster_node where isDelete=0 order by create_time limit 1").Scan(&node).Error; err != nil {
+ return node, err
+ }
+ return node, nil
+}
+
func (n *Node) UpdateDriftStateByNodeId(driftState string, nodeId string) bool {
var err error
tx := GetDB().Begin()
@@ -56,7 +70,7 @@
if err = tx.Exec("update "+n.TableName()+" set drift_state=? where id=?", driftState, nodeId).Error; err != nil {
return false
}
- if err = tx.Exec("update "+n.TableName()+" set drift_state='backup' where id !=?", nodeId).Error; err != nil {
+ if err = tx.Exec("update "+n.TableName()+" set drift_state='slave' where id !=?", nodeId).Error; err != nil {
return false
}
} else {
diff --git a/system-service/serf/handler.go b/system-service/serf/handler.go
index c9689a3..0e4566a 100644
--- a/system-service/serf/handler.go
+++ b/system-service/serf/handler.go
@@ -1,6 +1,7 @@
package serf
import (
+ "basic.com/pubsub/protomsg.git"
"basic.com/valib/logger.git"
"basic.com/valib/serf.git/serf"
"encoding/json"
@@ -16,6 +17,7 @@
"time"
"vamicro/config"
"vamicro/system-service/bhome_msg_dev"
+ "vamicro/system-service/models"
)
type RpcHandle func(arg RpcParamTopic) ([]serf.NodeResponse, error)
@@ -240,8 +242,45 @@
func HandleEventMemberLeave(ev serf.MemberEvent) {
if ev.Members != nil && len(ev.Members) == 1 {
leaveMember := ev.Members[0]
- leaveSql := "update cluster_node set isDelete=1,drift_state='' where node_id='" + leaveMember.Name + "'"
+
+ leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'"
flag, e := executeSqlByGorm([]string{leaveSql})
+
+ // 鍒ゆ柇绂诲紑鏄惁鏄富鑺傜偣绂诲紑, 鏇存崲涓昏妭鐐�
+ var clusterDb models.Node
+ clusterNodes, err := clusterDb.FindNodes()
+ if err == nil {
+ for _, node := range clusterNodes {
+ //logger.Info(node.Id, node.DriftState, leaveMember.Name)
+ if node.NodeId == leaveMember.Name && node.DriftState == "master" {
+ firstNode, _ := clusterDb.FindFirstNode()
+ //logger.Info("firstNode:", firstNode.NodeId, "local:", Agent.LocalMember().Name)
+ if firstNode.NodeId == Agent.LocalMember().Name {
+ //logger.Info("update master")
+ clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId)
+
+ // 閫氱煡涓昏妭鐐瑰彉鏇�
+ chMsg := protomsg.DbChangeMessage{
+ Id: firstNode.ClusterId,
+ Table: protomsg.TableChanged_T_Cluster,
+ Action: protomsg.DbAction_Insert,
+ Info: "slave2master",
+ }
+
+ bts, _ := json.Marshal(chMsg)
+ h := GetBusHandle()
+ if h == nil {
+ logger.Error("HandleEventMemberLeave bus handle is nil")
+ return
+ }
+ err := h.Publish("system-service", bts)
+ if err != nil {
+ logger.Error("HandleEventMemberLeave pub master err:", err)
+ }
+ }
+ }
+ }
+ }
logger.Info("EventMemberLeave,current Members:", ev.Members)
logLT := ""
@@ -267,7 +306,7 @@
fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
//joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
- joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='',node_ip='%s',create_time='%s' where node_id='%s'", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name)
+ joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s' and isDelete=1", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name)
flag, e := executeSqlByGorm([]string{joinSql})
logger.Info("EventMemberJoin,current Members:", ev.Members)
diff --git a/system-service/service/clusterService.go b/system-service/service/clusterService.go
index feffd61..288683f 100644
--- a/system-service/service/clusterService.go
+++ b/system-service/service/clusterService.go
@@ -444,7 +444,7 @@
return false
}
- joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId)
+ joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId)
if err = tx.Exec(joinSql).Error; err != nil {
logger.Debug("update isDelete err:", err)
return false
--
Gitblit v1.8.0