From 71b8885babe6dfd25c91b007018347c0c1bfac74 Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期五, 20 十月 2023 17:35:52 +0800 Subject: [PATCH] 添加主节点变更和主动切换功能 --- system-service/serf/handler.go | 139 +++++++++++++++++++++++++++++++++++++--------- 1 files changed, 111 insertions(+), 28 deletions(-) diff --git a/system-service/serf/handler.go b/system-service/serf/handler.go index 0e4566a..a90d4ba 100644 --- a/system-service/serf/handler.go +++ b/system-service/serf/handler.go @@ -80,6 +80,44 @@ logger.Info("LTime:", ev.LTime, " Recevie virtualIp change") SyncVirtualIpChan <- ev.Payload } +func HandleUserEventChangeMaster(ev serf.UserEvent) { + masterId := string(ev.Payload) + localId := Agent.LocalMember().Name + logger.Info("鍙樹负涓昏妭鐐圭殑id,", masterId, "鏈妭鐐筰d:", localId) + if masterId == localId { + return + } + + var clusterDb models.Node + localNode, err := clusterDb.FindNodeById(localId) + if err == nil { + if localNode.DriftState == "master" { + logger.Info("鏈妭鐐逛负涔嬪墠鐨刴aster,閫氱煡鍙樻洿涓簊lave") + + // 閫氱煡涓昏妭鐐瑰彉鏇� + chMsg := protomsg.DbChangeMessage{ + Id: localNode.ClusterId, + Table: protomsg.TableChanged_T_Cluster, + Action: protomsg.DbAction_Insert, + Info: "master2slave", + } + + bts, _ := json.Marshal(chMsg) + h := GetBusHandle() + if h == nil { + logger.Error("HandleEventMemberLeave bus handle is nil") + return + } + err := h.Publish("system-service", bts) + if err != nil { + logger.Error("HandleEventMemberLeave pub master err:", err) + } + } + } + + clusterDb.UpdateDriftStateByNodeId("master", masterId, false) +} + func HandleUserEventSyncMessage(ev serf.UserEvent) { logger.Info("receive a UserEventSyncMessage event") var procMsg ProcMessageEvent @@ -242,47 +280,46 @@ func HandleEventMemberLeave(ev serf.MemberEvent) { if ev.Members != nil && len(ev.Members) == 1 { leaveMember := ev.Members[0] + logger.Info("Event Member Leave, Members:", ev.Members) leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'" flag, e := executeSqlByGorm([]string{leaveSql}) // 鍒ゆ柇绂诲紑鏄惁鏄富鑺傜偣绂诲紑, 鏇存崲涓昏妭鐐� var clusterDb models.Node - clusterNodes, err := clusterDb.FindNodes() + leaveNode, err := clusterDb.FindNodeById(leaveMember.Name) if err == nil { - for _, node := range clusterNodes { - //logger.Info(node.Id, node.DriftState, leaveMember.Name) - if node.NodeId == leaveMember.Name && node.DriftState == "master" { - firstNode, _ := clusterDb.FindFirstNode() - //logger.Info("firstNode:", firstNode.NodeId, "local:", Agent.LocalMember().Name) - if firstNode.NodeId == Agent.LocalMember().Name { - //logger.Info("update master") - clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId) + logger.Info("鏌ヨ绂诲紑鑺傜偣鐨勪俊鎭�,", leaveNode) + if leaveNode.DriftState == "master" { + firstNode, _ := clusterDb.FindFirstNode() + logger.Info("绂诲紑鐨勮妭鐐逛负涓昏妭鐐�, 鏌ヨ鍔犲叆鏈�鏃╃殑鑺傜偣, ", firstNode) + logger.Info("鏈妭鐐逛俊鎭�:", Agent.LocalMember().Name) + if firstNode.NodeId == Agent.LocalMember().Name { + logger.Info("鏇存柊鏈妭鐐逛负涓昏妭鐐�") + clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId, true) - // 閫氱煡涓昏妭鐐瑰彉鏇� - chMsg := protomsg.DbChangeMessage{ - Id: firstNode.ClusterId, - Table: protomsg.TableChanged_T_Cluster, - Action: protomsg.DbAction_Insert, - Info: "slave2master", - } + // 閫氱煡涓昏妭鐐瑰彉鏇� + chMsg := protomsg.DbChangeMessage{ + Id: firstNode.ClusterId, + Table: protomsg.TableChanged_T_Cluster, + Action: protomsg.DbAction_Insert, + Info: "slave2master", + } - bts, _ := json.Marshal(chMsg) - h := GetBusHandle() - if h == nil { - logger.Error("HandleEventMemberLeave bus handle is nil") - return - } - err := h.Publish("system-service", bts) - if err != nil { - logger.Error("HandleEventMemberLeave pub master err:", err) - } + bts, _ := json.Marshal(chMsg) + h := GetBusHandle() + if h == nil { + logger.Error("HandleEventMemberLeave bus handle is nil") + return + } + err := h.Publish("system-service", bts) + if err != nil { + logger.Error("HandleEventMemberLeave pub master err:", err) } } } } - logger.Info("EventMemberLeave,current Members:", ev.Members) logLT := "" logT := time.Now().Format("2006-01-02 15:04:05") logSql := strings.ReplaceAll(leaveSql, "'", "''") @@ -305,8 +342,35 @@ timeUnix := time.Now().Unix() fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") - //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'" + // 鏇存柊鍦ㄧ嚎鐘舵�� + updateOnlineSql := fmt.Sprintf("update cluster_node set online=1 where node_id='%s'", joinMember.Name) + + // 濡傛灉鏄柊鍔犲叆鐨勬洿鏂板垱寤烘椂闂� joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s' and isDelete=1", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name) + flag, e := executeSqlByGorm([]string{updateOnlineSql, joinSql}) + + logger.Info("EventMemberJoin,current Members:", ev.Members) + logLT := "" + logT := time.Now().Format("2006-01-02 15:04:05") + logSql := strings.ReplaceAll(joinSql, "'", "''") + logResult := "0" + if flag { + logResult = "1" + } + logErr := "" + if e != nil { + logErr = e.Error() + } + executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"}) + } +} + +func HandleEventMemberFail(ev serf.MemberEvent) { + if ev.Members != nil && len(ev.Members) == 1 { + joinMember := ev.Members[0] + + //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'" + joinSql := fmt.Sprintf("update cluster_node set online=0 where node_id='%s'", joinMember.Name) flag, e := executeSqlByGorm([]string{joinSql}) logger.Info("EventMemberJoin,current Members:", ev.Members) @@ -324,3 +388,22 @@ executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"}) } } + +func HandleUpdateMemberStatus() { + if Agent == nil { + return + } + + for _, member := range Agent.Serf().Members() { + alive := 0 + if member.Status == serf.StatusAlive { + alive = 1 + } + + sql := fmt.Sprintf("update cluster_node set online=%d where node_id='%s'", alive, member.Name) + _, err := executeSqlByGorm([]string{sql}) + if err != nil { + logger.Error(err) + } + } +} -- Gitblit v1.8.0