From 71b8885babe6dfd25c91b007018347c0c1bfac74 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期五, 20 十月 2023 17:35:52 +0800
Subject: [PATCH] 添加主节点变更和主动切换功能
---
system-service/serf/serf.go | 17 ++-
system-service/vo/cluster.go | 20 ++-
system-service/controllers/cluster.go | 17 +++
system-service/service/clusterService.go | 26 +++++
system-service/models/node.go | 15 ++
system-service/serf/handler.go | 139 ++++++++++++++++++++++-----
system-service/main.go | 3
system-service/serf/sync.go | 13 ++
8 files changed, 207 insertions(+), 43 deletions(-)
diff --git a/system-service/controllers/cluster.go b/system-service/controllers/cluster.go
index df7d326..39b980f 100644
--- a/system-service/controllers/cluster.go
+++ b/system-service/controllers/cluster.go
@@ -78,6 +78,7 @@
"password": arr[0].Password,
"nodes": nodes,
"virtualIp": arr[0].VirtualIp,
+ "localId": config.Server.AnalyServerId,
}}
} else {
return &bhomeclient.Reply{Success: true}
@@ -114,6 +115,22 @@
}
}
+func (cc ClusterController) Update2Master(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply {
+ var nodeVo vo.UpdateClusterVo
+ err := c.BindJSON(&nodeVo)
+ if err != nil || nodeVo.NodeId == "" {
+ return &bhomeclient.Reply{Success: false, Msg: "鍙傛暟鏈夎"}
+ }
+
+ sv := service.NewClusterService(h.Bk)
+ b, _ := sv.UpdateDriftStateByNodeId(nodeVo.ClusterId, nodeVo.NodeId, "master")
+ if b {
+ return &bhomeclient.Reply{Success: true, Data: nil}
+ } else {
+ return &bhomeclient.Reply{Success: false, Msg: "鍙樻洿澶辫触"}
+ }
+}
+
// @Summary 鎼滅储闆嗙兢
// @Description 鎼滅储闆嗙兢
// @Accept json
diff --git a/system-service/main.go b/system-service/main.go
index 8e6fd0a..65977a6 100644
--- a/system-service/main.go
+++ b/system-service/main.go
@@ -180,6 +180,7 @@
funcMap[urlPrefix+"/cluster/leave"] = clusterController.Leave
funcMap[urlPrefix+"/cluster/findIpByNode"] = clusterController.FindIpByNode
funcMap[urlPrefix+"/cluster/status"] = clusterController.GetClusterStat
+ funcMap[urlPrefix+"/cluster/update2Master"] = clusterController.Update2Master
sysMenuC := new(controllers.SysMenuController)
funcMap["/data/api-u/sysmenus/tree"] = sysMenuC.MenuTree
@@ -285,7 +286,7 @@
case <-ctx.Done():
return
case msg := <-ms.SubCh:
- logger.Debug("recv sub msg topic:", string(msg.Topic), " data:", string(msg.Data))
+ logger.Debug("recv sub msg topic:", string(msg.Topic), " data len:", len(msg.Data))
service.PersistentWrapper(string(msg.Topic), msg.Data)
}
}
diff --git a/system-service/models/node.go b/system-service/models/node.go
index 5e1b046..b24f926 100644
--- a/system-service/models/node.go
+++ b/system-service/models/node.go
@@ -22,6 +22,7 @@
IsDelete bool `gorm:"column:isDelete;default:0" json:"isDelete"`
DeviceType string `gorm:"column:device_type" json:"device_type"` //璁惧鍨嬪彿`
DriftState string `gorm:"column:drift_state" json:"drift_state"` //婕傜Щ鐘舵��, master,backup
+ Online bool `gorm:"column:online;default:1" json:"online"` //鍦ㄧ嚎鐘舵��
}
func (Node) TableName() string {
@@ -57,9 +58,21 @@
return node, nil
}
-func (n *Node) UpdateDriftStateByNodeId(driftState string, nodeId string) bool {
+func (n *Node) FindNodeById(id string) (node Node, err error) {
+ if err = db.Raw("select * from cluster_node where id=?", id).Scan(&node).Error; err != nil {
+ return node, err
+ }
+ return node, nil
+}
+
+func (n *Node) UpdateDriftStateByNodeId(driftState string, nodeId string, sync bool) bool {
var err error
tx := GetDB().Begin()
+ if !sync {
+ GetDB().LogMode(false)
+ defer db.LogMode(true)
+ }
+
defer func() {
if err != nil && tx != nil {
logger.Error("updateDriftState err:", err)
diff --git a/system-service/serf/handler.go b/system-service/serf/handler.go
index 0e4566a..a90d4ba 100644
--- a/system-service/serf/handler.go
+++ b/system-service/serf/handler.go
@@ -80,6 +80,44 @@
logger.Info("LTime:", ev.LTime, " Recevie virtualIp change")
SyncVirtualIpChan <- ev.Payload
}
+func HandleUserEventChangeMaster(ev serf.UserEvent) {
+ masterId := string(ev.Payload)
+ localId := Agent.LocalMember().Name
+ logger.Info("鍙樹负涓昏妭鐐圭殑id,", masterId, "鏈妭鐐筰d:", localId)
+ if masterId == localId {
+ return
+ }
+
+ var clusterDb models.Node
+ localNode, err := clusterDb.FindNodeById(localId)
+ if err == nil {
+ if localNode.DriftState == "master" {
+ logger.Info("鏈妭鐐逛负涔嬪墠鐨刴aster,閫氱煡鍙樻洿涓簊lave")
+
+ // 閫氱煡涓昏妭鐐瑰彉鏇�
+ chMsg := protomsg.DbChangeMessage{
+ Id: localNode.ClusterId,
+ Table: protomsg.TableChanged_T_Cluster,
+ Action: protomsg.DbAction_Insert,
+ Info: "master2slave",
+ }
+
+ bts, _ := json.Marshal(chMsg)
+ h := GetBusHandle()
+ if h == nil {
+ logger.Error("HandleEventMemberLeave bus handle is nil")
+ return
+ }
+ err := h.Publish("system-service", bts)
+ if err != nil {
+ logger.Error("HandleEventMemberLeave pub master err:", err)
+ }
+ }
+ }
+
+ clusterDb.UpdateDriftStateByNodeId("master", masterId, false)
+}
+
func HandleUserEventSyncMessage(ev serf.UserEvent) {
logger.Info("receive a UserEventSyncMessage event")
var procMsg ProcMessageEvent
@@ -242,47 +280,46 @@
func HandleEventMemberLeave(ev serf.MemberEvent) {
if ev.Members != nil && len(ev.Members) == 1 {
leaveMember := ev.Members[0]
+ logger.Info("Event Member Leave, Members:", ev.Members)
leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'"
flag, e := executeSqlByGorm([]string{leaveSql})
// 鍒ゆ柇绂诲紑鏄惁鏄富鑺傜偣绂诲紑, 鏇存崲涓昏妭鐐�
var clusterDb models.Node
- clusterNodes, err := clusterDb.FindNodes()
+ leaveNode, err := clusterDb.FindNodeById(leaveMember.Name)
if err == nil {
- for _, node := range clusterNodes {
- //logger.Info(node.Id, node.DriftState, leaveMember.Name)
- if node.NodeId == leaveMember.Name && node.DriftState == "master" {
- firstNode, _ := clusterDb.FindFirstNode()
- //logger.Info("firstNode:", firstNode.NodeId, "local:", Agent.LocalMember().Name)
- if firstNode.NodeId == Agent.LocalMember().Name {
- //logger.Info("update master")
- clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId)
+ logger.Info("鏌ヨ绂诲紑鑺傜偣鐨勪俊鎭�,", leaveNode)
+ if leaveNode.DriftState == "master" {
+ firstNode, _ := clusterDb.FindFirstNode()
+ logger.Info("绂诲紑鐨勮妭鐐逛负涓昏妭鐐�, 鏌ヨ鍔犲叆鏈�鏃╃殑鑺傜偣, ", firstNode)
+ logger.Info("鏈妭鐐逛俊鎭�:", Agent.LocalMember().Name)
+ if firstNode.NodeId == Agent.LocalMember().Name {
+ logger.Info("鏇存柊鏈妭鐐逛负涓昏妭鐐�")
+ clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId, true)
- // 閫氱煡涓昏妭鐐瑰彉鏇�
- chMsg := protomsg.DbChangeMessage{
- Id: firstNode.ClusterId,
- Table: protomsg.TableChanged_T_Cluster,
- Action: protomsg.DbAction_Insert,
- Info: "slave2master",
- }
+ // 閫氱煡涓昏妭鐐瑰彉鏇�
+ chMsg := protomsg.DbChangeMessage{
+ Id: firstNode.ClusterId,
+ Table: protomsg.TableChanged_T_Cluster,
+ Action: protomsg.DbAction_Insert,
+ Info: "slave2master",
+ }
- bts, _ := json.Marshal(chMsg)
- h := GetBusHandle()
- if h == nil {
- logger.Error("HandleEventMemberLeave bus handle is nil")
- return
- }
- err := h.Publish("system-service", bts)
- if err != nil {
- logger.Error("HandleEventMemberLeave pub master err:", err)
- }
+ bts, _ := json.Marshal(chMsg)
+ h := GetBusHandle()
+ if h == nil {
+ logger.Error("HandleEventMemberLeave bus handle is nil")
+ return
+ }
+ err := h.Publish("system-service", bts)
+ if err != nil {
+ logger.Error("HandleEventMemberLeave pub master err:", err)
}
}
}
}
- logger.Info("EventMemberLeave,current Members:", ev.Members)
logLT := ""
logT := time.Now().Format("2006-01-02 15:04:05")
logSql := strings.ReplaceAll(leaveSql, "'", "''")
@@ -305,8 +342,35 @@
timeUnix := time.Now().Unix()
fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
- //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
+ // 鏇存柊鍦ㄧ嚎鐘舵��
+ updateOnlineSql := fmt.Sprintf("update cluster_node set online=1 where node_id='%s'", joinMember.Name)
+
+ // 濡傛灉鏄柊鍔犲叆鐨勬洿鏂板垱寤烘椂闂�
joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s' and isDelete=1", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name)
+ flag, e := executeSqlByGorm([]string{updateOnlineSql, joinSql})
+
+ logger.Info("EventMemberJoin,current Members:", ev.Members)
+ logLT := ""
+ logT := time.Now().Format("2006-01-02 15:04:05")
+ logSql := strings.ReplaceAll(joinSql, "'", "''")
+ logResult := "0"
+ if flag {
+ logResult = "1"
+ }
+ logErr := ""
+ if e != nil {
+ logErr = e.Error()
+ }
+ executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"})
+ }
+}
+
+func HandleEventMemberFail(ev serf.MemberEvent) {
+ if ev.Members != nil && len(ev.Members) == 1 {
+ joinMember := ev.Members[0]
+
+ //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
+ joinSql := fmt.Sprintf("update cluster_node set online=0 where node_id='%s'", joinMember.Name)
flag, e := executeSqlByGorm([]string{joinSql})
logger.Info("EventMemberJoin,current Members:", ev.Members)
@@ -324,3 +388,22 @@
executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"})
}
}
+
+func HandleUpdateMemberStatus() {
+ if Agent == nil {
+ return
+ }
+
+ for _, member := range Agent.Serf().Members() {
+ alive := 0
+ if member.Status == serf.StatusAlive {
+ alive = 1
+ }
+
+ sql := fmt.Sprintf("update cluster_node set online=%d where node_id='%s'", alive, member.Name)
+ _, err := executeSqlByGorm([]string{sql})
+ if err != nil {
+ logger.Error(err)
+ }
+ }
+}
diff --git a/system-service/serf/serf.go b/system-service/serf/serf.go
index b7b4fde..77e50c3 100644
--- a/system-service/serf/serf.go
+++ b/system-service/serf/serf.go
@@ -21,6 +21,7 @@
UserEventSyncSql = "SyncSql"
UserEventSyncDbTablePersonCache = "SyncCache"
UserEventSyncVirtualIp = "SyncVirtualIp" //婕傜Щip淇敼
+ UserEventChangeMaster = "ChangeMaster " //淇敼鑺傜偣鐘舵��
UserEventSyncRegisterInfo = "SyncRegisterInfo" //鍚屾娉ㄥ唽淇℃伅
DataSystemSerfSubscribe = "data-system-serf-subscribe" //鍚刟pp浠巗erf璁㈤槄娑堟伅
UserEventSyncMessage = "SyncMessageForProc" // 涓哄叾浠栬繘绋嬪悓姝ユ秷鎭�
@@ -42,6 +43,8 @@
HandleUserEventSyncDbTablePersonCache(ev)
} else if ev.Name == UserEventSyncVirtualIp {
HandleUserEventSyncVirtualIp(ev)
+ } else if ev.Name == UserEventChangeMaster {
+ HandleUserEventChangeMaster(ev)
} else if ev.Name == UserEventSyncRegisterInfo {
HandleSyncRegisterInfo(ev)
} else if ev.Name == DataSystemSerfSubscribe {
@@ -61,8 +64,10 @@
case serf.MemberEvent:
if event.EventType() == serf.EventMemberLeave {
HandleEventMemberLeave(ev)
- } else if event.EventType() == serf.EventMemberJoin {
+ } else if event.EventType() == serf.EventMemberJoin || event.EventType() == serf.EventMemberUpdate {
HandleEventMemberJoin(ev)
+ } else if event.EventType() == serf.EventMemberFailed {
+ HandleEventMemberFail(ev)
}
logger.Error("serf MemberEvent ", event.EventType())
default:
@@ -90,11 +95,11 @@
logger.Error("ExecuteSqlByGorm err:", err, ",sql:", sql)
return false, err
}
- if result.RowsAffected == 0 {
- logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql)
- err = errors.New("ExecuteSqlByGorm RowsAffected == 0")
- return false, err
- }
+ //if result.RowsAffected == 0 {
+ // logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql)
+ // err = errors.New("ExecuteSqlByGorm RowsAffected == 0")
+ // return false, err
+ //}
}
tx.Commit()
return true, nil
diff --git a/system-service/serf/sync.go b/system-service/serf/sync.go
index c5710da..8c831d8 100644
--- a/system-service/serf/sync.go
+++ b/system-service/serf/sync.go
@@ -75,6 +75,19 @@
}
}
}
+
+ go func() {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ HandleUpdateMemberStatus()
+ time.Sleep(5 * time.Second)
+ }
+ }
+ }()
+
go func() {
for {
select {
diff --git a/system-service/service/clusterService.go b/system-service/service/clusterService.go
index 288683f..a39741f 100644
--- a/system-service/service/clusterService.go
+++ b/system-service/service/clusterService.go
@@ -138,6 +138,31 @@
return false, ""
}
+// 鏍规嵁闆嗙兢鍚嶇О鍜屽瘑鐮佸垱寤洪泦缇�
+func (s ClusterService) UpdateDriftStateByNodeId(clusterId, nodeId, role string) (bool, string) {
+ var node models.Node
+ isSuccess := node.UpdateDriftStateByNodeId(role, nodeId, false)
+
+ if isSuccess {
+ // 閫氱煡涓昏妭鐐瑰彉鏇�
+ chMsg := protomsg.DbChangeMessage{
+ Id: clusterId,
+ Table: protomsg.TableChanged_T_Cluster,
+ Action: protomsg.DbAction_Insert,
+ Info: "slave2master",
+ }
+
+ s.AddDbMessage(&chMsg)
+
+ err := serf.Agent.UserEvent(serf.UserEventChangeMaster, []byte(nodeId), false)
+ if err != nil {
+ logger.Error("UserEventSyncVirtualIp err:", err)
+ }
+ }
+
+ return isSuccess, ""
+}
+
func (s ClusterService) SearchByPwd(pwd string) (err error) {
_, isSearching := getFromSearchMap()
if isSearching {
@@ -471,6 +496,7 @@
func ClusterSyncProcMessage(payload []byte) {
if serf.Agent == nil {
logger.Error("鏈姞鍏ラ泦缇�")
+ return
}
err := serf.Agent.UserEvent(serf.UserEventSyncMessage, payload, false)
diff --git a/system-service/vo/cluster.go b/system-service/vo/cluster.go
index f014a52..328f323 100644
--- a/system-service/vo/cluster.go
+++ b/system-service/vo/cluster.go
@@ -1,24 +1,30 @@
package vo
import "vamicro/system-service/models"
+
type ClusterVo struct {
ClusterInfo models.Cluster `json:"clusterInfo"`
- Nodes []models.Node `json:"nodes"`
+ Nodes []models.Node `json:"nodes"`
}
type ClusterCreateVo struct {
- Password string `json:"password"`
+ Password string `json:"password"`
ClusterName string `json:"clusterName"`
- ClusterId string `json:"clusterId"`
- VirtualIp string `json:"virtualIp"`
+ ClusterId string `json:"clusterId"`
+ VirtualIp string `json:"virtualIp"`
}
type ClusterSearchVo struct {
Password string `json:"password"`
}
-type ClusterJoinVo struct {
+type UpdateClusterVo struct {
ClusterId string `json:"clusterId"`
- Password string `json:"password"`
- NodeIps []string `json:"nodeIps"`
+ NodeId string `json:"nodeId"`
+}
+
+type ClusterJoinVo struct {
+ ClusterId string `json:"clusterId"`
+ Password string `json:"password"`
+ NodeIps []string `json:"nodeIps"`
}
--
Gitblit v1.8.0