From cc6d3763795b62d353df0c1eb58cbb736b709829 Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期五, 20 十月 2023 09:48:37 +0800 Subject: [PATCH] 添加主节点退出时, 重新选举master的功能 --- system-service/service/clusterService.go | 2 system-service/models/node.go | 16 +++++++ system-service/serf/handler.go | 43 ++++++++++++++++++++- 3 files changed, 57 insertions(+), 4 deletions(-) diff --git a/system-service/models/node.go b/system-service/models/node.go index 24d8af4..5e1b046 100644 --- a/system-service/models/node.go +++ b/system-service/models/node.go @@ -43,6 +43,20 @@ return nodes, nil } +func (n *Node) FindNodes() (nodes []Node, err error) { + if err = db.Raw("select * from cluster_node").Scan(&nodes).Error; err != nil { + return nil, err + } + return nodes, nil +} + +func (n *Node) FindFirstNode() (node Node, err error) { + if err = db.Raw("select * from cluster_node where isDelete=0 order by create_time limit 1").Scan(&node).Error; err != nil { + return node, err + } + return node, nil +} + func (n *Node) UpdateDriftStateByNodeId(driftState string, nodeId string) bool { var err error tx := GetDB().Begin() @@ -56,7 +70,7 @@ if err = tx.Exec("update "+n.TableName()+" set drift_state=? where id=?", driftState, nodeId).Error; err != nil { return false } - if err = tx.Exec("update "+n.TableName()+" set drift_state='backup' where id !=?", nodeId).Error; err != nil { + if err = tx.Exec("update "+n.TableName()+" set drift_state='slave' where id !=?", nodeId).Error; err != nil { return false } } else { diff --git a/system-service/serf/handler.go b/system-service/serf/handler.go index c9689a3..0e4566a 100644 --- a/system-service/serf/handler.go +++ b/system-service/serf/handler.go @@ -1,6 +1,7 @@ package serf import ( + "basic.com/pubsub/protomsg.git" "basic.com/valib/logger.git" "basic.com/valib/serf.git/serf" "encoding/json" @@ -16,6 +17,7 @@ "time" "vamicro/config" "vamicro/system-service/bhome_msg_dev" + "vamicro/system-service/models" ) type RpcHandle func(arg RpcParamTopic) ([]serf.NodeResponse, error) @@ -240,8 +242,45 @@ func HandleEventMemberLeave(ev serf.MemberEvent) { if ev.Members != nil && len(ev.Members) == 1 { leaveMember := ev.Members[0] - leaveSql := "update cluster_node set isDelete=1,drift_state='' where node_id='" + leaveMember.Name + "'" + + leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'" flag, e := executeSqlByGorm([]string{leaveSql}) + + // 鍒ゆ柇绂诲紑鏄惁鏄富鑺傜偣绂诲紑, 鏇存崲涓昏妭鐐� + var clusterDb models.Node + clusterNodes, err := clusterDb.FindNodes() + if err == nil { + for _, node := range clusterNodes { + //logger.Info(node.Id, node.DriftState, leaveMember.Name) + if node.NodeId == leaveMember.Name && node.DriftState == "master" { + firstNode, _ := clusterDb.FindFirstNode() + //logger.Info("firstNode:", firstNode.NodeId, "local:", Agent.LocalMember().Name) + if firstNode.NodeId == Agent.LocalMember().Name { + //logger.Info("update master") + clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId) + + // 閫氱煡涓昏妭鐐瑰彉鏇� + chMsg := protomsg.DbChangeMessage{ + Id: firstNode.ClusterId, + Table: protomsg.TableChanged_T_Cluster, + Action: protomsg.DbAction_Insert, + Info: "slave2master", + } + + bts, _ := json.Marshal(chMsg) + h := GetBusHandle() + if h == nil { + logger.Error("HandleEventMemberLeave bus handle is nil") + return + } + err := h.Publish("system-service", bts) + if err != nil { + logger.Error("HandleEventMemberLeave pub master err:", err) + } + } + } + } + } logger.Info("EventMemberLeave,current Members:", ev.Members) logLT := "" @@ -267,7 +306,7 @@ fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'" - joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='',node_ip='%s',create_time='%s' where node_id='%s'", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name) + joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s' and isDelete=1", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name) flag, e := executeSqlByGorm([]string{joinSql}) logger.Info("EventMemberJoin,current Members:", ev.Members) diff --git a/system-service/service/clusterService.go b/system-service/service/clusterService.go index feffd61..288683f 100644 --- a/system-service/service/clusterService.go +++ b/system-service/service/clusterService.go @@ -444,7 +444,7 @@ return false } - joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId) + joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId) if err = tx.Exec(joinSql).Error; err != nil { logger.Debug("update isDelete err:", err) return false -- Gitblit v1.8.0