From 71b8885babe6dfd25c91b007018347c0c1bfac74 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期五, 20 十月 2023 17:35:52 +0800
Subject: [PATCH] 添加主节点变更和主动切换功能

---
 system-service/serf/serf.go              |   17 ++-
 system-service/vo/cluster.go             |   20 ++-
 system-service/controllers/cluster.go    |   17 +++
 system-service/service/clusterService.go |   26 +++++
 system-service/models/node.go            |   15 ++
 system-service/serf/handler.go           |  139 ++++++++++++++++++++++-----
 system-service/main.go                   |    3 
 system-service/serf/sync.go              |   13 ++
 8 files changed, 207 insertions(+), 43 deletions(-)

diff --git a/system-service/controllers/cluster.go b/system-service/controllers/cluster.go
index df7d326..39b980f 100644
--- a/system-service/controllers/cluster.go
+++ b/system-service/controllers/cluster.go
@@ -78,6 +78,7 @@
 				"password":    arr[0].Password,
 				"nodes":       nodes,
 				"virtualIp":   arr[0].VirtualIp,
+				"localId":     config.Server.AnalyServerId,
 			}}
 		} else {
 			return &bhomeclient.Reply{Success: true}
@@ -114,6 +115,22 @@
 	}
 }
 
+func (cc ClusterController) Update2Master(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply {
+	var nodeVo vo.UpdateClusterVo
+	err := c.BindJSON(&nodeVo)
+	if err != nil || nodeVo.NodeId == "" {
+		return &bhomeclient.Reply{Success: false, Msg: "鍙傛暟鏈夎"}
+	}
+
+	sv := service.NewClusterService(h.Bk)
+	b, _ := sv.UpdateDriftStateByNodeId(nodeVo.ClusterId, nodeVo.NodeId, "master")
+	if b {
+		return &bhomeclient.Reply{Success: true, Data: nil}
+	} else {
+		return &bhomeclient.Reply{Success: false, Msg: "鍙樻洿澶辫触"}
+	}
+}
+
 // @Summary 鎼滅储闆嗙兢
 // @Description 鎼滅储闆嗙兢
 // @Accept json
diff --git a/system-service/main.go b/system-service/main.go
index 8e6fd0a..65977a6 100644
--- a/system-service/main.go
+++ b/system-service/main.go
@@ -180,6 +180,7 @@
 	funcMap[urlPrefix+"/cluster/leave"] = clusterController.Leave
 	funcMap[urlPrefix+"/cluster/findIpByNode"] = clusterController.FindIpByNode
 	funcMap[urlPrefix+"/cluster/status"] = clusterController.GetClusterStat
+	funcMap[urlPrefix+"/cluster/update2Master"] = clusterController.Update2Master
 
 	sysMenuC := new(controllers.SysMenuController)
 	funcMap["/data/api-u/sysmenus/tree"] = sysMenuC.MenuTree
@@ -285,7 +286,7 @@
 		case <-ctx.Done():
 			return
 		case msg := <-ms.SubCh:
-			logger.Debug("recv sub msg topic:", string(msg.Topic), " data:", string(msg.Data))
+			logger.Debug("recv sub msg topic:", string(msg.Topic), " data len:", len(msg.Data))
 			service.PersistentWrapper(string(msg.Topic), msg.Data)
 		}
 	}
diff --git a/system-service/models/node.go b/system-service/models/node.go
index 5e1b046..b24f926 100644
--- a/system-service/models/node.go
+++ b/system-service/models/node.go
@@ -22,6 +22,7 @@
 	IsDelete   bool   `gorm:"column:isDelete;default:0" json:"isDelete"`
 	DeviceType string `gorm:"column:device_type" json:"device_type"` //璁惧鍨嬪彿`
 	DriftState string `gorm:"column:drift_state" json:"drift_state"` //婕傜Щ鐘舵��, master,backup
+	Online     bool   `gorm:"column:online;default:1" json:"online"` //鍦ㄧ嚎鐘舵��
 }
 
 func (Node) TableName() string {
@@ -57,9 +58,21 @@
 	return node, nil
 }
 
-func (n *Node) UpdateDriftStateByNodeId(driftState string, nodeId string) bool {
+func (n *Node) FindNodeById(id string) (node Node, err error) {
+	if err = db.Raw("select * from cluster_node where id=?", id).Scan(&node).Error; err != nil {
+		return node, err
+	}
+	return node, nil
+}
+
+func (n *Node) UpdateDriftStateByNodeId(driftState string, nodeId string, sync bool) bool {
 	var err error
 	tx := GetDB().Begin()
+	if !sync {
+		GetDB().LogMode(false)
+		defer db.LogMode(true)
+	}
+
 	defer func() {
 		if err != nil && tx != nil {
 			logger.Error("updateDriftState err:", err)
diff --git a/system-service/serf/handler.go b/system-service/serf/handler.go
index 0e4566a..a90d4ba 100644
--- a/system-service/serf/handler.go
+++ b/system-service/serf/handler.go
@@ -80,6 +80,44 @@
 	logger.Info("LTime:", ev.LTime, " Recevie virtualIp change")
 	SyncVirtualIpChan <- ev.Payload
 }
+func HandleUserEventChangeMaster(ev serf.UserEvent) {
+	masterId := string(ev.Payload)
+	localId := Agent.LocalMember().Name
+	logger.Info("鍙樹负涓昏妭鐐圭殑id,", masterId, "鏈妭鐐筰d:", localId)
+	if masterId == localId {
+		return
+	}
+
+	var clusterDb models.Node
+	localNode, err := clusterDb.FindNodeById(localId)
+	if err == nil {
+		if localNode.DriftState == "master" {
+			logger.Info("鏈妭鐐逛负涔嬪墠鐨刴aster,閫氱煡鍙樻洿涓簊lave")
+
+			// 閫氱煡涓昏妭鐐瑰彉鏇�
+			chMsg := protomsg.DbChangeMessage{
+				Id:     localNode.ClusterId,
+				Table:  protomsg.TableChanged_T_Cluster,
+				Action: protomsg.DbAction_Insert,
+				Info:   "master2slave",
+			}
+
+			bts, _ := json.Marshal(chMsg)
+			h := GetBusHandle()
+			if h == nil {
+				logger.Error("HandleEventMemberLeave bus handle is nil")
+				return
+			}
+			err := h.Publish("system-service", bts)
+			if err != nil {
+				logger.Error("HandleEventMemberLeave pub master err:", err)
+			}
+		}
+	}
+
+	clusterDb.UpdateDriftStateByNodeId("master", masterId, false)
+}
+
 func HandleUserEventSyncMessage(ev serf.UserEvent) {
 	logger.Info("receive a UserEventSyncMessage event")
 	var procMsg ProcMessageEvent
@@ -242,47 +280,46 @@
 func HandleEventMemberLeave(ev serf.MemberEvent) {
 	if ev.Members != nil && len(ev.Members) == 1 {
 		leaveMember := ev.Members[0]
+		logger.Info("Event Member Leave, Members:", ev.Members)
 
 		leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'"
 		flag, e := executeSqlByGorm([]string{leaveSql})
 
 		// 鍒ゆ柇绂诲紑鏄惁鏄富鑺傜偣绂诲紑, 鏇存崲涓昏妭鐐�
 		var clusterDb models.Node
-		clusterNodes, err := clusterDb.FindNodes()
+		leaveNode, err := clusterDb.FindNodeById(leaveMember.Name)
 		if err == nil {
-			for _, node := range clusterNodes {
-				//logger.Info(node.Id, node.DriftState, leaveMember.Name)
-				if node.NodeId == leaveMember.Name && node.DriftState == "master" {
-					firstNode, _ := clusterDb.FindFirstNode()
-					//logger.Info("firstNode:", firstNode.NodeId, "local:", Agent.LocalMember().Name)
-					if firstNode.NodeId == Agent.LocalMember().Name {
-						//logger.Info("update master")
-						clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId)
+			logger.Info("鏌ヨ绂诲紑鑺傜偣鐨勪俊鎭�,", leaveNode)
+			if leaveNode.DriftState == "master" {
+				firstNode, _ := clusterDb.FindFirstNode()
+				logger.Info("绂诲紑鐨勮妭鐐逛负涓昏妭鐐�, 鏌ヨ鍔犲叆鏈�鏃╃殑鑺傜偣, ", firstNode)
+				logger.Info("鏈妭鐐逛俊鎭�:", Agent.LocalMember().Name)
+				if firstNode.NodeId == Agent.LocalMember().Name {
+					logger.Info("鏇存柊鏈妭鐐逛负涓昏妭鐐�")
+					clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId, true)
 
-						// 閫氱煡涓昏妭鐐瑰彉鏇�
-						chMsg := protomsg.DbChangeMessage{
-							Id:     firstNode.ClusterId,
-							Table:  protomsg.TableChanged_T_Cluster,
-							Action: protomsg.DbAction_Insert,
-							Info:   "slave2master",
-						}
+					// 閫氱煡涓昏妭鐐瑰彉鏇�
+					chMsg := protomsg.DbChangeMessage{
+						Id:     firstNode.ClusterId,
+						Table:  protomsg.TableChanged_T_Cluster,
+						Action: protomsg.DbAction_Insert,
+						Info:   "slave2master",
+					}
 
-						bts, _ := json.Marshal(chMsg)
-						h := GetBusHandle()
-						if h == nil {
-							logger.Error("HandleEventMemberLeave bus handle is nil")
-							return
-						}
-						err := h.Publish("system-service", bts)
-						if err != nil {
-							logger.Error("HandleEventMemberLeave pub master err:", err)
-						}
+					bts, _ := json.Marshal(chMsg)
+					h := GetBusHandle()
+					if h == nil {
+						logger.Error("HandleEventMemberLeave bus handle is nil")
+						return
+					}
+					err := h.Publish("system-service", bts)
+					if err != nil {
+						logger.Error("HandleEventMemberLeave pub master err:", err)
 					}
 				}
 			}
 		}
 
-		logger.Info("EventMemberLeave,current Members:", ev.Members)
 		logLT := ""
 		logT := time.Now().Format("2006-01-02 15:04:05")
 		logSql := strings.ReplaceAll(leaveSql, "'", "''")
@@ -305,8 +342,35 @@
 		timeUnix := time.Now().Unix()
 		fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
 
-		//joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
+		// 鏇存柊鍦ㄧ嚎鐘舵��
+		updateOnlineSql := fmt.Sprintf("update cluster_node set online=1 where node_id='%s'", joinMember.Name)
+
+		// 濡傛灉鏄柊鍔犲叆鐨勬洿鏂板垱寤烘椂闂�
 		joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s' and isDelete=1", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name)
+		flag, e := executeSqlByGorm([]string{updateOnlineSql, joinSql})
+
+		logger.Info("EventMemberJoin,current Members:", ev.Members)
+		logLT := ""
+		logT := time.Now().Format("2006-01-02 15:04:05")
+		logSql := strings.ReplaceAll(joinSql, "'", "''")
+		logResult := "0"
+		if flag {
+			logResult = "1"
+		}
+		logErr := ""
+		if e != nil {
+			logErr = e.Error()
+		}
+		executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"})
+	}
+}
+
+func HandleEventMemberFail(ev serf.MemberEvent) {
+	if ev.Members != nil && len(ev.Members) == 1 {
+		joinMember := ev.Members[0]
+
+		//joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
+		joinSql := fmt.Sprintf("update cluster_node set online=0 where node_id='%s'", joinMember.Name)
 		flag, e := executeSqlByGorm([]string{joinSql})
 
 		logger.Info("EventMemberJoin,current Members:", ev.Members)
@@ -324,3 +388,22 @@
 		executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"})
 	}
 }
+
+func HandleUpdateMemberStatus() {
+	if Agent == nil {
+		return
+	}
+
+	for _, member := range Agent.Serf().Members() {
+		alive := 0
+		if member.Status == serf.StatusAlive {
+			alive = 1
+		}
+
+		sql := fmt.Sprintf("update cluster_node set online=%d where node_id='%s'", alive, member.Name)
+		_, err := executeSqlByGorm([]string{sql})
+		if err != nil {
+			logger.Error(err)
+		}
+	}
+}
diff --git a/system-service/serf/serf.go b/system-service/serf/serf.go
index b7b4fde..77e50c3 100644
--- a/system-service/serf/serf.go
+++ b/system-service/serf/serf.go
@@ -21,6 +21,7 @@
 	UserEventSyncSql                = "SyncSql"
 	UserEventSyncDbTablePersonCache = "SyncCache"
 	UserEventSyncVirtualIp          = "SyncVirtualIp"              //婕傜Щip淇敼
+	UserEventChangeMaster           = "ChangeMaster "              //淇敼鑺傜偣鐘舵��
 	UserEventSyncRegisterInfo       = "SyncRegisterInfo"           //鍚屾娉ㄥ唽淇℃伅
 	DataSystemSerfSubscribe         = "data-system-serf-subscribe" //鍚刟pp浠巗erf璁㈤槄娑堟伅
 	UserEventSyncMessage            = "SyncMessageForProc"         // 涓哄叾浠栬繘绋嬪悓姝ユ秷鎭�
@@ -42,6 +43,8 @@
 			HandleUserEventSyncDbTablePersonCache(ev)
 		} else if ev.Name == UserEventSyncVirtualIp {
 			HandleUserEventSyncVirtualIp(ev)
+		} else if ev.Name == UserEventChangeMaster {
+			HandleUserEventChangeMaster(ev)
 		} else if ev.Name == UserEventSyncRegisterInfo {
 			HandleSyncRegisterInfo(ev)
 		} else if ev.Name == DataSystemSerfSubscribe {
@@ -61,8 +64,10 @@
 	case serf.MemberEvent:
 		if event.EventType() == serf.EventMemberLeave {
 			HandleEventMemberLeave(ev)
-		} else if event.EventType() == serf.EventMemberJoin {
+		} else if event.EventType() == serf.EventMemberJoin || event.EventType() == serf.EventMemberUpdate {
 			HandleEventMemberJoin(ev)
+		} else if event.EventType() == serf.EventMemberFailed {
+			HandleEventMemberFail(ev)
 		}
 		logger.Error("serf MemberEvent ", event.EventType())
 	default:
@@ -90,11 +95,11 @@
 					logger.Error("ExecuteSqlByGorm err:", err, ",sql:", sql)
 					return false, err
 				}
-				if result.RowsAffected == 0 {
-					logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql)
-					err = errors.New("ExecuteSqlByGorm RowsAffected == 0")
-					return false, err
-				}
+				//if result.RowsAffected == 0 {
+				//	logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql)
+				//	err = errors.New("ExecuteSqlByGorm RowsAffected == 0")
+				//	return false, err
+				//}
 			}
 			tx.Commit()
 			return true, nil
diff --git a/system-service/serf/sync.go b/system-service/serf/sync.go
index c5710da..8c831d8 100644
--- a/system-service/serf/sync.go
+++ b/system-service/serf/sync.go
@@ -75,6 +75,19 @@
 			}
 		}
 	}
+
+	go func() {
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			default:
+				HandleUpdateMemberStatus()
+				time.Sleep(5 * time.Second)
+			}
+		}
+	}()
+
 	go func() {
 		for {
 			select {
diff --git a/system-service/service/clusterService.go b/system-service/service/clusterService.go
index 288683f..a39741f 100644
--- a/system-service/service/clusterService.go
+++ b/system-service/service/clusterService.go
@@ -138,6 +138,31 @@
 	return false, ""
 }
 
+// 鏍规嵁闆嗙兢鍚嶇О鍜屽瘑鐮佸垱寤洪泦缇�
+func (s ClusterService) UpdateDriftStateByNodeId(clusterId, nodeId, role string) (bool, string) {
+	var node models.Node
+	isSuccess := node.UpdateDriftStateByNodeId(role, nodeId, false)
+
+	if isSuccess {
+		// 閫氱煡涓昏妭鐐瑰彉鏇�
+		chMsg := protomsg.DbChangeMessage{
+			Id:     clusterId,
+			Table:  protomsg.TableChanged_T_Cluster,
+			Action: protomsg.DbAction_Insert,
+			Info:   "slave2master",
+		}
+
+		s.AddDbMessage(&chMsg)
+
+		err := serf.Agent.UserEvent(serf.UserEventChangeMaster, []byte(nodeId), false)
+		if err != nil {
+			logger.Error("UserEventSyncVirtualIp err:", err)
+		}
+	}
+
+	return isSuccess, ""
+}
+
 func (s ClusterService) SearchByPwd(pwd string) (err error) {
 	_, isSearching := getFromSearchMap()
 	if isSearching {
@@ -471,6 +496,7 @@
 func ClusterSyncProcMessage(payload []byte) {
 	if serf.Agent == nil {
 		logger.Error("鏈姞鍏ラ泦缇�")
+		return
 	}
 
 	err := serf.Agent.UserEvent(serf.UserEventSyncMessage, payload, false)
diff --git a/system-service/vo/cluster.go b/system-service/vo/cluster.go
index f014a52..328f323 100644
--- a/system-service/vo/cluster.go
+++ b/system-service/vo/cluster.go
@@ -1,24 +1,30 @@
 package vo
 
 import "vamicro/system-service/models"
+
 type ClusterVo struct {
 	ClusterInfo models.Cluster `json:"clusterInfo"`
-	Nodes []models.Node `json:"nodes"`
+	Nodes       []models.Node  `json:"nodes"`
 }
 
 type ClusterCreateVo struct {
-	Password string `json:"password"`
+	Password    string `json:"password"`
 	ClusterName string `json:"clusterName"`
-	ClusterId string `json:"clusterId"`
-	VirtualIp string `json:"virtualIp"`
+	ClusterId   string `json:"clusterId"`
+	VirtualIp   string `json:"virtualIp"`
 }
 
 type ClusterSearchVo struct {
 	Password string `json:"password"`
 }
 
-type ClusterJoinVo struct {
+type UpdateClusterVo struct {
 	ClusterId string `json:"clusterId"`
-	Password string `json:"password"`
-	NodeIps []string `json:"nodeIps"`
+	NodeId    string `json:"nodeId"`
+}
+
+type ClusterJoinVo struct {
+	ClusterId string   `json:"clusterId"`
+	Password  string   `json:"password"`
+	NodeIps   []string `json:"nodeIps"`
 }

--
Gitblit v1.8.0