From 71b8885babe6dfd25c91b007018347c0c1bfac74 Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期五, 20 十月 2023 17:35:52 +0800 Subject: [PATCH] 添加主节点变更和主动切换功能 --- system-service/serf/serf.go | 17 ++- system-service/vo/cluster.go | 20 ++- system-service/controllers/cluster.go | 17 +++ system-service/service/clusterService.go | 26 +++++ system-service/models/node.go | 15 ++ system-service/serf/handler.go | 139 ++++++++++++++++++++++----- system-service/main.go | 3 system-service/serf/sync.go | 13 ++ 8 files changed, 207 insertions(+), 43 deletions(-) diff --git a/system-service/controllers/cluster.go b/system-service/controllers/cluster.go index df7d326..39b980f 100644 --- a/system-service/controllers/cluster.go +++ b/system-service/controllers/cluster.go @@ -78,6 +78,7 @@ "password": arr[0].Password, "nodes": nodes, "virtualIp": arr[0].VirtualIp, + "localId": config.Server.AnalyServerId, }} } else { return &bhomeclient.Reply{Success: true} @@ -114,6 +115,22 @@ } } +func (cc ClusterController) Update2Master(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply { + var nodeVo vo.UpdateClusterVo + err := c.BindJSON(&nodeVo) + if err != nil || nodeVo.NodeId == "" { + return &bhomeclient.Reply{Success: false, Msg: "鍙傛暟鏈夎"} + } + + sv := service.NewClusterService(h.Bk) + b, _ := sv.UpdateDriftStateByNodeId(nodeVo.ClusterId, nodeVo.NodeId, "master") + if b { + return &bhomeclient.Reply{Success: true, Data: nil} + } else { + return &bhomeclient.Reply{Success: false, Msg: "鍙樻洿澶辫触"} + } +} + // @Summary 鎼滅储闆嗙兢 // @Description 鎼滅储闆嗙兢 // @Accept json diff --git a/system-service/main.go b/system-service/main.go index 8e6fd0a..65977a6 100644 --- a/system-service/main.go +++ b/system-service/main.go @@ -180,6 +180,7 @@ funcMap[urlPrefix+"/cluster/leave"] = clusterController.Leave funcMap[urlPrefix+"/cluster/findIpByNode"] = clusterController.FindIpByNode funcMap[urlPrefix+"/cluster/status"] = clusterController.GetClusterStat + funcMap[urlPrefix+"/cluster/update2Master"] = clusterController.Update2Master sysMenuC := new(controllers.SysMenuController) funcMap["/data/api-u/sysmenus/tree"] = sysMenuC.MenuTree @@ -285,7 +286,7 @@ case <-ctx.Done(): return case msg := <-ms.SubCh: - logger.Debug("recv sub msg topic:", string(msg.Topic), " data:", string(msg.Data)) + logger.Debug("recv sub msg topic:", string(msg.Topic), " data len:", len(msg.Data)) service.PersistentWrapper(string(msg.Topic), msg.Data) } } diff --git a/system-service/models/node.go b/system-service/models/node.go index 5e1b046..b24f926 100644 --- a/system-service/models/node.go +++ b/system-service/models/node.go @@ -22,6 +22,7 @@ IsDelete bool `gorm:"column:isDelete;default:0" json:"isDelete"` DeviceType string `gorm:"column:device_type" json:"device_type"` //璁惧鍨嬪彿` DriftState string `gorm:"column:drift_state" json:"drift_state"` //婕傜Щ鐘舵��, master,backup + Online bool `gorm:"column:online;default:1" json:"online"` //鍦ㄧ嚎鐘舵�� } func (Node) TableName() string { @@ -57,9 +58,21 @@ return node, nil } -func (n *Node) UpdateDriftStateByNodeId(driftState string, nodeId string) bool { +func (n *Node) FindNodeById(id string) (node Node, err error) { + if err = db.Raw("select * from cluster_node where id=?", id).Scan(&node).Error; err != nil { + return node, err + } + return node, nil +} + +func (n *Node) UpdateDriftStateByNodeId(driftState string, nodeId string, sync bool) bool { var err error tx := GetDB().Begin() + if !sync { + GetDB().LogMode(false) + defer db.LogMode(true) + } + defer func() { if err != nil && tx != nil { logger.Error("updateDriftState err:", err) diff --git a/system-service/serf/handler.go b/system-service/serf/handler.go index 0e4566a..a90d4ba 100644 --- a/system-service/serf/handler.go +++ b/system-service/serf/handler.go @@ -80,6 +80,44 @@ logger.Info("LTime:", ev.LTime, " Recevie virtualIp change") SyncVirtualIpChan <- ev.Payload } +func HandleUserEventChangeMaster(ev serf.UserEvent) { + masterId := string(ev.Payload) + localId := Agent.LocalMember().Name + logger.Info("鍙樹负涓昏妭鐐圭殑id,", masterId, "鏈妭鐐筰d:", localId) + if masterId == localId { + return + } + + var clusterDb models.Node + localNode, err := clusterDb.FindNodeById(localId) + if err == nil { + if localNode.DriftState == "master" { + logger.Info("鏈妭鐐逛负涔嬪墠鐨刴aster,閫氱煡鍙樻洿涓簊lave") + + // 閫氱煡涓昏妭鐐瑰彉鏇� + chMsg := protomsg.DbChangeMessage{ + Id: localNode.ClusterId, + Table: protomsg.TableChanged_T_Cluster, + Action: protomsg.DbAction_Insert, + Info: "master2slave", + } + + bts, _ := json.Marshal(chMsg) + h := GetBusHandle() + if h == nil { + logger.Error("HandleEventMemberLeave bus handle is nil") + return + } + err := h.Publish("system-service", bts) + if err != nil { + logger.Error("HandleEventMemberLeave pub master err:", err) + } + } + } + + clusterDb.UpdateDriftStateByNodeId("master", masterId, false) +} + func HandleUserEventSyncMessage(ev serf.UserEvent) { logger.Info("receive a UserEventSyncMessage event") var procMsg ProcMessageEvent @@ -242,47 +280,46 @@ func HandleEventMemberLeave(ev serf.MemberEvent) { if ev.Members != nil && len(ev.Members) == 1 { leaveMember := ev.Members[0] + logger.Info("Event Member Leave, Members:", ev.Members) leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'" flag, e := executeSqlByGorm([]string{leaveSql}) // 鍒ゆ柇绂诲紑鏄惁鏄富鑺傜偣绂诲紑, 鏇存崲涓昏妭鐐� var clusterDb models.Node - clusterNodes, err := clusterDb.FindNodes() + leaveNode, err := clusterDb.FindNodeById(leaveMember.Name) if err == nil { - for _, node := range clusterNodes { - //logger.Info(node.Id, node.DriftState, leaveMember.Name) - if node.NodeId == leaveMember.Name && node.DriftState == "master" { - firstNode, _ := clusterDb.FindFirstNode() - //logger.Info("firstNode:", firstNode.NodeId, "local:", Agent.LocalMember().Name) - if firstNode.NodeId == Agent.LocalMember().Name { - //logger.Info("update master") - clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId) + logger.Info("鏌ヨ绂诲紑鑺傜偣鐨勪俊鎭�,", leaveNode) + if leaveNode.DriftState == "master" { + firstNode, _ := clusterDb.FindFirstNode() + logger.Info("绂诲紑鐨勮妭鐐逛负涓昏妭鐐�, 鏌ヨ鍔犲叆鏈�鏃╃殑鑺傜偣, ", firstNode) + logger.Info("鏈妭鐐逛俊鎭�:", Agent.LocalMember().Name) + if firstNode.NodeId == Agent.LocalMember().Name { + logger.Info("鏇存柊鏈妭鐐逛负涓昏妭鐐�") + clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId, true) - // 閫氱煡涓昏妭鐐瑰彉鏇� - chMsg := protomsg.DbChangeMessage{ - Id: firstNode.ClusterId, - Table: protomsg.TableChanged_T_Cluster, - Action: protomsg.DbAction_Insert, - Info: "slave2master", - } + // 閫氱煡涓昏妭鐐瑰彉鏇� + chMsg := protomsg.DbChangeMessage{ + Id: firstNode.ClusterId, + Table: protomsg.TableChanged_T_Cluster, + Action: protomsg.DbAction_Insert, + Info: "slave2master", + } - bts, _ := json.Marshal(chMsg) - h := GetBusHandle() - if h == nil { - logger.Error("HandleEventMemberLeave bus handle is nil") - return - } - err := h.Publish("system-service", bts) - if err != nil { - logger.Error("HandleEventMemberLeave pub master err:", err) - } + bts, _ := json.Marshal(chMsg) + h := GetBusHandle() + if h == nil { + logger.Error("HandleEventMemberLeave bus handle is nil") + return + } + err := h.Publish("system-service", bts) + if err != nil { + logger.Error("HandleEventMemberLeave pub master err:", err) } } } } - logger.Info("EventMemberLeave,current Members:", ev.Members) logLT := "" logT := time.Now().Format("2006-01-02 15:04:05") logSql := strings.ReplaceAll(leaveSql, "'", "''") @@ -305,8 +342,35 @@ timeUnix := time.Now().Unix() fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") - //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'" + // 鏇存柊鍦ㄧ嚎鐘舵�� + updateOnlineSql := fmt.Sprintf("update cluster_node set online=1 where node_id='%s'", joinMember.Name) + + // 濡傛灉鏄柊鍔犲叆鐨勬洿鏂板垱寤烘椂闂� joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s' and isDelete=1", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name) + flag, e := executeSqlByGorm([]string{updateOnlineSql, joinSql}) + + logger.Info("EventMemberJoin,current Members:", ev.Members) + logLT := "" + logT := time.Now().Format("2006-01-02 15:04:05") + logSql := strings.ReplaceAll(joinSql, "'", "''") + logResult := "0" + if flag { + logResult = "1" + } + logErr := "" + if e != nil { + logErr = e.Error() + } + executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"}) + } +} + +func HandleEventMemberFail(ev serf.MemberEvent) { + if ev.Members != nil && len(ev.Members) == 1 { + joinMember := ev.Members[0] + + //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'" + joinSql := fmt.Sprintf("update cluster_node set online=0 where node_id='%s'", joinMember.Name) flag, e := executeSqlByGorm([]string{joinSql}) logger.Info("EventMemberJoin,current Members:", ev.Members) @@ -324,3 +388,22 @@ executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"}) } } + +func HandleUpdateMemberStatus() { + if Agent == nil { + return + } + + for _, member := range Agent.Serf().Members() { + alive := 0 + if member.Status == serf.StatusAlive { + alive = 1 + } + + sql := fmt.Sprintf("update cluster_node set online=%d where node_id='%s'", alive, member.Name) + _, err := executeSqlByGorm([]string{sql}) + if err != nil { + logger.Error(err) + } + } +} diff --git a/system-service/serf/serf.go b/system-service/serf/serf.go index b7b4fde..77e50c3 100644 --- a/system-service/serf/serf.go +++ b/system-service/serf/serf.go @@ -21,6 +21,7 @@ UserEventSyncSql = "SyncSql" UserEventSyncDbTablePersonCache = "SyncCache" UserEventSyncVirtualIp = "SyncVirtualIp" //婕傜Щip淇敼 + UserEventChangeMaster = "ChangeMaster " //淇敼鑺傜偣鐘舵�� UserEventSyncRegisterInfo = "SyncRegisterInfo" //鍚屾娉ㄥ唽淇℃伅 DataSystemSerfSubscribe = "data-system-serf-subscribe" //鍚刟pp浠巗erf璁㈤槄娑堟伅 UserEventSyncMessage = "SyncMessageForProc" // 涓哄叾浠栬繘绋嬪悓姝ユ秷鎭� @@ -42,6 +43,8 @@ HandleUserEventSyncDbTablePersonCache(ev) } else if ev.Name == UserEventSyncVirtualIp { HandleUserEventSyncVirtualIp(ev) + } else if ev.Name == UserEventChangeMaster { + HandleUserEventChangeMaster(ev) } else if ev.Name == UserEventSyncRegisterInfo { HandleSyncRegisterInfo(ev) } else if ev.Name == DataSystemSerfSubscribe { @@ -61,8 +64,10 @@ case serf.MemberEvent: if event.EventType() == serf.EventMemberLeave { HandleEventMemberLeave(ev) - } else if event.EventType() == serf.EventMemberJoin { + } else if event.EventType() == serf.EventMemberJoin || event.EventType() == serf.EventMemberUpdate { HandleEventMemberJoin(ev) + } else if event.EventType() == serf.EventMemberFailed { + HandleEventMemberFail(ev) } logger.Error("serf MemberEvent ", event.EventType()) default: @@ -90,11 +95,11 @@ logger.Error("ExecuteSqlByGorm err:", err, ",sql:", sql) return false, err } - if result.RowsAffected == 0 { - logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql) - err = errors.New("ExecuteSqlByGorm RowsAffected == 0") - return false, err - } + //if result.RowsAffected == 0 { + // logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql) + // err = errors.New("ExecuteSqlByGorm RowsAffected == 0") + // return false, err + //} } tx.Commit() return true, nil diff --git a/system-service/serf/sync.go b/system-service/serf/sync.go index c5710da..8c831d8 100644 --- a/system-service/serf/sync.go +++ b/system-service/serf/sync.go @@ -75,6 +75,19 @@ } } } + + go func() { + for { + select { + case <-ctx.Done(): + return + default: + HandleUpdateMemberStatus() + time.Sleep(5 * time.Second) + } + } + }() + go func() { for { select { diff --git a/system-service/service/clusterService.go b/system-service/service/clusterService.go index 288683f..a39741f 100644 --- a/system-service/service/clusterService.go +++ b/system-service/service/clusterService.go @@ -138,6 +138,31 @@ return false, "" } +// 鏍规嵁闆嗙兢鍚嶇О鍜屽瘑鐮佸垱寤洪泦缇� +func (s ClusterService) UpdateDriftStateByNodeId(clusterId, nodeId, role string) (bool, string) { + var node models.Node + isSuccess := node.UpdateDriftStateByNodeId(role, nodeId, false) + + if isSuccess { + // 閫氱煡涓昏妭鐐瑰彉鏇� + chMsg := protomsg.DbChangeMessage{ + Id: clusterId, + Table: protomsg.TableChanged_T_Cluster, + Action: protomsg.DbAction_Insert, + Info: "slave2master", + } + + s.AddDbMessage(&chMsg) + + err := serf.Agent.UserEvent(serf.UserEventChangeMaster, []byte(nodeId), false) + if err != nil { + logger.Error("UserEventSyncVirtualIp err:", err) + } + } + + return isSuccess, "" +} + func (s ClusterService) SearchByPwd(pwd string) (err error) { _, isSearching := getFromSearchMap() if isSearching { @@ -471,6 +496,7 @@ func ClusterSyncProcMessage(payload []byte) { if serf.Agent == nil { logger.Error("鏈姞鍏ラ泦缇�") + return } err := serf.Agent.UserEvent(serf.UserEventSyncMessage, payload, false) diff --git a/system-service/vo/cluster.go b/system-service/vo/cluster.go index f014a52..328f323 100644 --- a/system-service/vo/cluster.go +++ b/system-service/vo/cluster.go @@ -1,24 +1,30 @@ package vo import "vamicro/system-service/models" + type ClusterVo struct { ClusterInfo models.Cluster `json:"clusterInfo"` - Nodes []models.Node `json:"nodes"` + Nodes []models.Node `json:"nodes"` } type ClusterCreateVo struct { - Password string `json:"password"` + Password string `json:"password"` ClusterName string `json:"clusterName"` - ClusterId string `json:"clusterId"` - VirtualIp string `json:"virtualIp"` + ClusterId string `json:"clusterId"` + VirtualIp string `json:"virtualIp"` } type ClusterSearchVo struct { Password string `json:"password"` } -type ClusterJoinVo struct { +type UpdateClusterVo struct { ClusterId string `json:"clusterId"` - Password string `json:"password"` - NodeIps []string `json:"nodeIps"` + NodeId string `json:"nodeId"` +} + +type ClusterJoinVo struct { + ClusterId string `json:"clusterId"` + Password string `json:"password"` + NodeIps []string `json:"nodeIps"` } -- Gitblit v1.8.0