| | |
| | | return nodes, nil |
| | | } |
| | | |
| | | func (n *Node) FindNodes() (nodes []Node, err error) { |
| | | if err = db.Raw("select * from cluster_node").Scan(&nodes).Error; err != nil { |
| | | return nil, err |
| | | } |
| | | return nodes, nil |
| | | } |
| | | |
| | | func (n *Node) FindFirstNode() (node Node, err error) { |
| | | if err = db.Raw("select * from cluster_node where isDelete=0 order by create_time limit 1").Scan(&node).Error; err != nil { |
| | | return node, err |
| | | } |
| | | return node, nil |
| | | } |
| | | |
| | | func (n *Node) UpdateDriftStateByNodeId(driftState string, nodeId string) bool { |
| | | var err error |
| | | tx := GetDB().Begin() |
| | |
| | | if err = tx.Exec("update "+n.TableName()+" set drift_state=? where id=?", driftState, nodeId).Error; err != nil { |
| | | return false |
| | | } |
| | | if err = tx.Exec("update "+n.TableName()+" set drift_state='backup' where id !=?", nodeId).Error; err != nil { |
| | | if err = tx.Exec("update "+n.TableName()+" set drift_state='slave' where id !=?", nodeId).Error; err != nil { |
| | | return false |
| | | } |
| | | } else { |
| | |
| | | package serf |
| | | |
| | | import ( |
| | | "basic.com/pubsub/protomsg.git" |
| | | "basic.com/valib/logger.git" |
| | | "basic.com/valib/serf.git/serf" |
| | | "encoding/json" |
| | |
| | | "time" |
| | | "vamicro/config" |
| | | "vamicro/system-service/bhome_msg_dev" |
| | | "vamicro/system-service/models" |
| | | ) |
| | | |
| | | type RpcHandle func(arg RpcParamTopic) ([]serf.NodeResponse, error) |
| | |
| | | func HandleEventMemberLeave(ev serf.MemberEvent) { |
| | | if ev.Members != nil && len(ev.Members) == 1 { |
| | | leaveMember := ev.Members[0] |
| | | leaveSql := "update cluster_node set isDelete=1,drift_state='' where node_id='" + leaveMember.Name + "'" |
| | | |
| | | leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'" |
| | | flag, e := executeSqlByGorm([]string{leaveSql}) |
| | | |
| | | // 判断离开是否是主节点离开, 更换主节点 |
| | | var clusterDb models.Node |
| | | clusterNodes, err := clusterDb.FindNodes() |
| | | if err == nil { |
| | | for _, node := range clusterNodes { |
| | | //logger.Info(node.Id, node.DriftState, leaveMember.Name) |
| | | if node.NodeId == leaveMember.Name && node.DriftState == "master" { |
| | | firstNode, _ := clusterDb.FindFirstNode() |
| | | //logger.Info("firstNode:", firstNode.NodeId, "local:", Agent.LocalMember().Name) |
| | | if firstNode.NodeId == Agent.LocalMember().Name { |
| | | //logger.Info("update master") |
| | | clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId) |
| | | |
| | | // 通知主节点变更 |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: firstNode.ClusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "slave2master", |
| | | } |
| | | |
| | | bts, _ := json.Marshal(chMsg) |
| | | h := GetBusHandle() |
| | | if h == nil { |
| | | logger.Error("HandleEventMemberLeave bus handle is nil") |
| | | return |
| | | } |
| | | err := h.Publish("system-service", bts) |
| | | if err != nil { |
| | | logger.Error("HandleEventMemberLeave pub master err:", err) |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | logger.Info("EventMemberLeave,current Members:", ev.Members) |
| | | logLT := "" |
| | |
| | | fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") |
| | | |
| | | //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'" |
| | | joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='',node_ip='%s',create_time='%s' where node_id='%s'", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name) |
| | | joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s' and isDelete=1", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name) |
| | | flag, e := executeSqlByGorm([]string{joinSql}) |
| | | |
| | | logger.Info("EventMemberJoin,current Members:", ev.Members) |
| | |
| | | return false |
| | | } |
| | | |
| | | joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId) |
| | | joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId) |
| | | if err = tx.Exec(joinSql).Error; err != nil { |
| | | logger.Debug("update isDelete err:", err) |
| | | return false |