zhangzengfei
2023-10-20 cc6d3763795b62d353df0c1eb58cbb736b709829
添加主节点退出时, 重新选举master的功能
3个文件已修改
61 ■■■■■ 已修改文件
system-service/models/node.go 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/serf/handler.go 43 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/service/clusterService.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/models/node.go
@@ -43,6 +43,20 @@
    return nodes, nil
}
func (n *Node) FindNodes() (nodes []Node, err error) {
    if err = db.Raw("select * from cluster_node").Scan(&nodes).Error; err != nil {
        return nil, err
    }
    return nodes, nil
}
func (n *Node) FindFirstNode() (node Node, err error) {
    if err = db.Raw("select * from cluster_node where isDelete=0 order by create_time limit 1").Scan(&node).Error; err != nil {
        return node, err
    }
    return node, nil
}
func (n *Node) UpdateDriftStateByNodeId(driftState string, nodeId string) bool {
    var err error
    tx := GetDB().Begin()
@@ -56,7 +70,7 @@
        if err = tx.Exec("update "+n.TableName()+" set drift_state=? where id=?", driftState, nodeId).Error; err != nil {
            return false
        }
        if err = tx.Exec("update "+n.TableName()+" set drift_state='backup' where id !=?", nodeId).Error; err != nil {
        if err = tx.Exec("update "+n.TableName()+" set drift_state='slave' where id !=?", nodeId).Error; err != nil {
            return false
        }
    } else {
system-service/serf/handler.go
@@ -1,6 +1,7 @@
package serf
import (
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/logger.git"
    "basic.com/valib/serf.git/serf"
    "encoding/json"
@@ -16,6 +17,7 @@
    "time"
    "vamicro/config"
    "vamicro/system-service/bhome_msg_dev"
    "vamicro/system-service/models"
)
type RpcHandle func(arg RpcParamTopic) ([]serf.NodeResponse, error)
@@ -240,8 +242,45 @@
func HandleEventMemberLeave(ev serf.MemberEvent) {
    if ev.Members != nil && len(ev.Members) == 1 {
        leaveMember := ev.Members[0]
        leaveSql := "update cluster_node set isDelete=1,drift_state='' where node_id='" + leaveMember.Name + "'"
        leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'"
        flag, e := executeSqlByGorm([]string{leaveSql})
        // 判断离开是否是主节点离开, 更换主节点
        var clusterDb models.Node
        clusterNodes, err := clusterDb.FindNodes()
        if err == nil {
            for _, node := range clusterNodes {
                //logger.Info(node.Id, node.DriftState, leaveMember.Name)
                if node.NodeId == leaveMember.Name && node.DriftState == "master" {
                    firstNode, _ := clusterDb.FindFirstNode()
                    //logger.Info("firstNode:", firstNode.NodeId, "local:", Agent.LocalMember().Name)
                    if firstNode.NodeId == Agent.LocalMember().Name {
                        //logger.Info("update master")
                        clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId)
                        // 通知主节点变更
                        chMsg := protomsg.DbChangeMessage{
                            Id:     firstNode.ClusterId,
                            Table:  protomsg.TableChanged_T_Cluster,
                            Action: protomsg.DbAction_Insert,
                            Info:   "slave2master",
                        }
                        bts, _ := json.Marshal(chMsg)
                        h := GetBusHandle()
                        if h == nil {
                            logger.Error("HandleEventMemberLeave bus handle is nil")
                            return
                        }
                        err := h.Publish("system-service", bts)
                        if err != nil {
                            logger.Error("HandleEventMemberLeave pub master err:", err)
                        }
                    }
                }
            }
        }
        logger.Info("EventMemberLeave,current Members:", ev.Members)
        logLT := ""
@@ -267,7 +306,7 @@
        fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
        //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
        joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='',node_ip='%s',create_time='%s' where node_id='%s'", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name)
        joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s' and isDelete=1", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name)
        flag, e := executeSqlByGorm([]string{joinSql})
        logger.Info("EventMemberJoin,current Members:", ev.Members)
system-service/service/clusterService.go
@@ -444,7 +444,7 @@
        return false
    }
    joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId)
    joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId)
    if err = tx.Exec(joinSql).Error; err != nil {
        logger.Debug("update isDelete err:", err)
        return false