zhangzengfei
2023-10-20 71b8885babe6dfd25c91b007018347c0c1bfac74
添加主节点变更和主动切换功能
8个文件已修改
250 ■■■■ 已修改文件
system-service/controllers/cluster.go 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/main.go 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/models/node.go 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/serf/handler.go 139 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/serf/serf.go 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/serf/sync.go 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/service/clusterService.go 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/vo/cluster.go 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/controllers/cluster.go
@@ -78,6 +78,7 @@
                "password":    arr[0].Password,
                "nodes":       nodes,
                "virtualIp":   arr[0].VirtualIp,
                "localId":     config.Server.AnalyServerId,
            }}
        } else {
            return &bhomeclient.Reply{Success: true}
@@ -114,6 +115,22 @@
    }
}
func (cc ClusterController) Update2Master(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply {
    var nodeVo vo.UpdateClusterVo
    err := c.BindJSON(&nodeVo)
    if err != nil || nodeVo.NodeId == "" {
        return &bhomeclient.Reply{Success: false, Msg: "参数有误"}
    }
    sv := service.NewClusterService(h.Bk)
    b, _ := sv.UpdateDriftStateByNodeId(nodeVo.ClusterId, nodeVo.NodeId, "master")
    if b {
        return &bhomeclient.Reply{Success: true, Data: nil}
    } else {
        return &bhomeclient.Reply{Success: false, Msg: "变更失败"}
    }
}
// @Summary 搜索集群
// @Description 搜索集群
// @Accept json
system-service/main.go
@@ -180,6 +180,7 @@
    funcMap[urlPrefix+"/cluster/leave"] = clusterController.Leave
    funcMap[urlPrefix+"/cluster/findIpByNode"] = clusterController.FindIpByNode
    funcMap[urlPrefix+"/cluster/status"] = clusterController.GetClusterStat
    funcMap[urlPrefix+"/cluster/update2Master"] = clusterController.Update2Master
    sysMenuC := new(controllers.SysMenuController)
    funcMap["/data/api-u/sysmenus/tree"] = sysMenuC.MenuTree
@@ -285,7 +286,7 @@
        case <-ctx.Done():
            return
        case msg := <-ms.SubCh:
            logger.Debug("recv sub msg topic:", string(msg.Topic), " data:", string(msg.Data))
            logger.Debug("recv sub msg topic:", string(msg.Topic), " data len:", len(msg.Data))
            service.PersistentWrapper(string(msg.Topic), msg.Data)
        }
    }
system-service/models/node.go
@@ -22,6 +22,7 @@
    IsDelete   bool   `gorm:"column:isDelete;default:0" json:"isDelete"`
    DeviceType string `gorm:"column:device_type" json:"device_type"` //设备型号`
    DriftState string `gorm:"column:drift_state" json:"drift_state"` //漂移状态, master,backup
    Online     bool   `gorm:"column:online;default:1" json:"online"` //在线状态
}
func (Node) TableName() string {
@@ -57,9 +58,21 @@
    return node, nil
}
func (n *Node) UpdateDriftStateByNodeId(driftState string, nodeId string) bool {
func (n *Node) FindNodeById(id string) (node Node, err error) {
    if err = db.Raw("select * from cluster_node where id=?", id).Scan(&node).Error; err != nil {
        return node, err
    }
    return node, nil
}
func (n *Node) UpdateDriftStateByNodeId(driftState string, nodeId string, sync bool) bool {
    var err error
    tx := GetDB().Begin()
    if !sync {
        GetDB().LogMode(false)
        defer db.LogMode(true)
    }
    defer func() {
        if err != nil && tx != nil {
            logger.Error("updateDriftState err:", err)
system-service/serf/handler.go
@@ -80,6 +80,44 @@
    logger.Info("LTime:", ev.LTime, " Recevie virtualIp change")
    SyncVirtualIpChan <- ev.Payload
}
func HandleUserEventChangeMaster(ev serf.UserEvent) {
    masterId := string(ev.Payload)
    localId := Agent.LocalMember().Name
    logger.Info("变为主节点的id,", masterId, "本节点id:", localId)
    if masterId == localId {
        return
    }
    var clusterDb models.Node
    localNode, err := clusterDb.FindNodeById(localId)
    if err == nil {
        if localNode.DriftState == "master" {
            logger.Info("本节点为之前的master,通知变更为slave")
            // 通知主节点变更
            chMsg := protomsg.DbChangeMessage{
                Id:     localNode.ClusterId,
                Table:  protomsg.TableChanged_T_Cluster,
                Action: protomsg.DbAction_Insert,
                Info:   "master2slave",
            }
            bts, _ := json.Marshal(chMsg)
            h := GetBusHandle()
            if h == nil {
                logger.Error("HandleEventMemberLeave bus handle is nil")
                return
            }
            err := h.Publish("system-service", bts)
            if err != nil {
                logger.Error("HandleEventMemberLeave pub master err:", err)
            }
        }
    }
    clusterDb.UpdateDriftStateByNodeId("master", masterId, false)
}
func HandleUserEventSyncMessage(ev serf.UserEvent) {
    logger.Info("receive a UserEventSyncMessage event")
    var procMsg ProcMessageEvent
@@ -242,47 +280,46 @@
func HandleEventMemberLeave(ev serf.MemberEvent) {
    if ev.Members != nil && len(ev.Members) == 1 {
        leaveMember := ev.Members[0]
        logger.Info("Event Member Leave, Members:", ev.Members)
        leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'"
        flag, e := executeSqlByGorm([]string{leaveSql})
        // 判断离开是否是主节点离开, 更换主节点
        var clusterDb models.Node
        clusterNodes, err := clusterDb.FindNodes()
        leaveNode, err := clusterDb.FindNodeById(leaveMember.Name)
        if err == nil {
            for _, node := range clusterNodes {
                //logger.Info(node.Id, node.DriftState, leaveMember.Name)
                if node.NodeId == leaveMember.Name && node.DriftState == "master" {
                    firstNode, _ := clusterDb.FindFirstNode()
                    //logger.Info("firstNode:", firstNode.NodeId, "local:", Agent.LocalMember().Name)
                    if firstNode.NodeId == Agent.LocalMember().Name {
                        //logger.Info("update master")
                        clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId)
            logger.Info("查询离开节点的信息,", leaveNode)
            if leaveNode.DriftState == "master" {
                firstNode, _ := clusterDb.FindFirstNode()
                logger.Info("离开的节点为主节点, 查询加入最早的节点, ", firstNode)
                logger.Info("本节点信息:", Agent.LocalMember().Name)
                if firstNode.NodeId == Agent.LocalMember().Name {
                    logger.Info("更新本节点为主节点")
                    clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId, true)
                        // 通知主节点变更
                        chMsg := protomsg.DbChangeMessage{
                            Id:     firstNode.ClusterId,
                            Table:  protomsg.TableChanged_T_Cluster,
                            Action: protomsg.DbAction_Insert,
                            Info:   "slave2master",
                        }
                    // 通知主节点变更
                    chMsg := protomsg.DbChangeMessage{
                        Id:     firstNode.ClusterId,
                        Table:  protomsg.TableChanged_T_Cluster,
                        Action: protomsg.DbAction_Insert,
                        Info:   "slave2master",
                    }
                        bts, _ := json.Marshal(chMsg)
                        h := GetBusHandle()
                        if h == nil {
                            logger.Error("HandleEventMemberLeave bus handle is nil")
                            return
                        }
                        err := h.Publish("system-service", bts)
                        if err != nil {
                            logger.Error("HandleEventMemberLeave pub master err:", err)
                        }
                    bts, _ := json.Marshal(chMsg)
                    h := GetBusHandle()
                    if h == nil {
                        logger.Error("HandleEventMemberLeave bus handle is nil")
                        return
                    }
                    err := h.Publish("system-service", bts)
                    if err != nil {
                        logger.Error("HandleEventMemberLeave pub master err:", err)
                    }
                }
            }
        }
        logger.Info("EventMemberLeave,current Members:", ev.Members)
        logLT := ""
        logT := time.Now().Format("2006-01-02 15:04:05")
        logSql := strings.ReplaceAll(leaveSql, "'", "''")
@@ -305,8 +342,35 @@
        timeUnix := time.Now().Unix()
        fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
        //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
        // 更新在线状态
        updateOnlineSql := fmt.Sprintf("update cluster_node set online=1 where node_id='%s'", joinMember.Name)
        // 如果是新加入的更新创建时间
        joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s' and isDelete=1", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name)
        flag, e := executeSqlByGorm([]string{updateOnlineSql, joinSql})
        logger.Info("EventMemberJoin,current Members:", ev.Members)
        logLT := ""
        logT := time.Now().Format("2006-01-02 15:04:05")
        logSql := strings.ReplaceAll(joinSql, "'", "''")
        logResult := "0"
        if flag {
            logResult = "1"
        }
        logErr := ""
        if e != nil {
            logErr = e.Error()
        }
        executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"})
    }
}
func HandleEventMemberFail(ev serf.MemberEvent) {
    if ev.Members != nil && len(ev.Members) == 1 {
        joinMember := ev.Members[0]
        //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
        joinSql := fmt.Sprintf("update cluster_node set online=0 where node_id='%s'", joinMember.Name)
        flag, e := executeSqlByGorm([]string{joinSql})
        logger.Info("EventMemberJoin,current Members:", ev.Members)
@@ -324,3 +388,22 @@
        executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"})
    }
}
func HandleUpdateMemberStatus() {
    if Agent == nil {
        return
    }
    for _, member := range Agent.Serf().Members() {
        alive := 0
        if member.Status == serf.StatusAlive {
            alive = 1
        }
        sql := fmt.Sprintf("update cluster_node set online=%d where node_id='%s'", alive, member.Name)
        _, err := executeSqlByGorm([]string{sql})
        if err != nil {
            logger.Error(err)
        }
    }
}
system-service/serf/serf.go
@@ -21,6 +21,7 @@
    UserEventSyncSql                = "SyncSql"
    UserEventSyncDbTablePersonCache = "SyncCache"
    UserEventSyncVirtualIp          = "SyncVirtualIp"              //漂移ip修改
    UserEventChangeMaster           = "ChangeMaster "              //修改节点状态
    UserEventSyncRegisterInfo       = "SyncRegisterInfo"           //同步注册信息
    DataSystemSerfSubscribe         = "data-system-serf-subscribe" //各app从serf订阅消息
    UserEventSyncMessage            = "SyncMessageForProc"         // 为其他进程同步消息
@@ -42,6 +43,8 @@
            HandleUserEventSyncDbTablePersonCache(ev)
        } else if ev.Name == UserEventSyncVirtualIp {
            HandleUserEventSyncVirtualIp(ev)
        } else if ev.Name == UserEventChangeMaster {
            HandleUserEventChangeMaster(ev)
        } else if ev.Name == UserEventSyncRegisterInfo {
            HandleSyncRegisterInfo(ev)
        } else if ev.Name == DataSystemSerfSubscribe {
@@ -61,8 +64,10 @@
    case serf.MemberEvent:
        if event.EventType() == serf.EventMemberLeave {
            HandleEventMemberLeave(ev)
        } else if event.EventType() == serf.EventMemberJoin {
        } else if event.EventType() == serf.EventMemberJoin || event.EventType() == serf.EventMemberUpdate {
            HandleEventMemberJoin(ev)
        } else if event.EventType() == serf.EventMemberFailed {
            HandleEventMemberFail(ev)
        }
        logger.Error("serf MemberEvent ", event.EventType())
    default:
@@ -90,11 +95,11 @@
                    logger.Error("ExecuteSqlByGorm err:", err, ",sql:", sql)
                    return false, err
                }
                if result.RowsAffected == 0 {
                    logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql)
                    err = errors.New("ExecuteSqlByGorm RowsAffected == 0")
                    return false, err
                }
                //if result.RowsAffected == 0 {
                //    logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql)
                //    err = errors.New("ExecuteSqlByGorm RowsAffected == 0")
                //    return false, err
                //}
            }
            tx.Commit()
            return true, nil
system-service/serf/sync.go
@@ -75,6 +75,19 @@
            }
        }
    }
    go func() {
        for {
            select {
            case <-ctx.Done():
                return
            default:
                HandleUpdateMemberStatus()
                time.Sleep(5 * time.Second)
            }
        }
    }()
    go func() {
        for {
            select {
system-service/service/clusterService.go
@@ -138,6 +138,31 @@
    return false, ""
}
// 根据集群名称和密码创建集群
func (s ClusterService) UpdateDriftStateByNodeId(clusterId, nodeId, role string) (bool, string) {
    var node models.Node
    isSuccess := node.UpdateDriftStateByNodeId(role, nodeId, false)
    if isSuccess {
        // 通知主节点变更
        chMsg := protomsg.DbChangeMessage{
            Id:     clusterId,
            Table:  protomsg.TableChanged_T_Cluster,
            Action: protomsg.DbAction_Insert,
            Info:   "slave2master",
        }
        s.AddDbMessage(&chMsg)
        err := serf.Agent.UserEvent(serf.UserEventChangeMaster, []byte(nodeId), false)
        if err != nil {
            logger.Error("UserEventSyncVirtualIp err:", err)
        }
    }
    return isSuccess, ""
}
func (s ClusterService) SearchByPwd(pwd string) (err error) {
    _, isSearching := getFromSearchMap()
    if isSearching {
@@ -471,6 +496,7 @@
func ClusterSyncProcMessage(payload []byte) {
    if serf.Agent == nil {
        logger.Error("未加入集群")
        return
    }
    err := serf.Agent.UserEvent(serf.UserEventSyncMessage, payload, false)
system-service/vo/cluster.go
@@ -1,24 +1,30 @@
package vo
import "vamicro/system-service/models"
type ClusterVo struct {
    ClusterInfo models.Cluster `json:"clusterInfo"`
    Nodes []models.Node `json:"nodes"`
    Nodes       []models.Node  `json:"nodes"`
}
type ClusterCreateVo struct {
    Password string `json:"password"`
    Password    string `json:"password"`
    ClusterName string `json:"clusterName"`
    ClusterId string `json:"clusterId"`
    VirtualIp string `json:"virtualIp"`
    ClusterId   string `json:"clusterId"`
    VirtualIp   string `json:"virtualIp"`
}
type ClusterSearchVo struct {
    Password string `json:"password"`
}
type ClusterJoinVo struct {
type UpdateClusterVo struct {
    ClusterId string `json:"clusterId"`
    Password string `json:"password"`
    NodeIps []string `json:"nodeIps"`
    NodeId    string `json:"nodeId"`
}
type ClusterJoinVo struct {
    ClusterId string   `json:"clusterId"`
    Password  string   `json:"password"`
    NodeIps   []string `json:"nodeIps"`
}