| | |
| | | logger.Info("LTime:", ev.LTime, " Recevie virtualIp change") |
| | | SyncVirtualIpChan <- ev.Payload |
| | | } |
| | | func HandleUserEventChangeMaster(ev serf.UserEvent) { |
| | | masterId := string(ev.Payload) |
| | | localId := Agent.LocalMember().Name |
| | | logger.Info("变为主节点的id,", masterId, "本节点id:", localId) |
| | | if masterId == localId { |
| | | return |
| | | } |
| | | |
| | | var clusterDb models.Node |
| | | localNode, err := clusterDb.FindNodeById(localId) |
| | | if err == nil { |
| | | if localNode.DriftState == "master" { |
| | | logger.Info("本节点为之前的master,通知变更为slave") |
| | | |
| | | // 通知主节点变更 |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: localNode.ClusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "master2slave", |
| | | } |
| | | |
| | | bts, _ := json.Marshal(chMsg) |
| | | h := GetBusHandle() |
| | | if h == nil { |
| | | logger.Error("HandleEventMemberLeave bus handle is nil") |
| | | return |
| | | } |
| | | err := h.Publish("system-service", bts) |
| | | if err != nil { |
| | | logger.Error("HandleEventMemberLeave pub master err:", err) |
| | | } |
| | | } |
| | | } |
| | | |
| | | clusterDb.UpdateDriftStateByNodeId("master", masterId, false) |
| | | } |
| | | |
| | | func HandleUserEventSyncMessage(ev serf.UserEvent) { |
| | | logger.Info("receive a UserEventSyncMessage event") |
| | | var procMsg ProcMessageEvent |
| | |
| | | func HandleEventMemberLeave(ev serf.MemberEvent) { |
| | | if ev.Members != nil && len(ev.Members) == 1 { |
| | | leaveMember := ev.Members[0] |
| | | logger.Info("Event Member Leave, Members:", ev.Members) |
| | | |
| | | leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'" |
| | | flag, e := executeSqlByGorm([]string{leaveSql}) |
| | | |
| | | // 判断离开是否是主节点离开, 更换主节点 |
| | | var clusterDb models.Node |
| | | clusterNodes, err := clusterDb.FindNodes() |
| | | leaveNode, err := clusterDb.FindNodeById(leaveMember.Name) |
| | | if err == nil { |
| | | for _, node := range clusterNodes { |
| | | //logger.Info(node.Id, node.DriftState, leaveMember.Name) |
| | | if node.NodeId == leaveMember.Name && node.DriftState == "master" { |
| | | firstNode, _ := clusterDb.FindFirstNode() |
| | | //logger.Info("firstNode:", firstNode.NodeId, "local:", Agent.LocalMember().Name) |
| | | if firstNode.NodeId == Agent.LocalMember().Name { |
| | | //logger.Info("update master") |
| | | clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId) |
| | | logger.Info("查询离开节点的信息,", leaveNode) |
| | | if leaveNode.DriftState == "master" { |
| | | firstNode, _ := clusterDb.FindFirstNode() |
| | | logger.Info("离开的节点为主节点, 查询加入最早的节点, ", firstNode) |
| | | logger.Info("本节点信息:", Agent.LocalMember().Name) |
| | | if firstNode.NodeId == Agent.LocalMember().Name { |
| | | logger.Info("更新本节点为主节点") |
| | | clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId, true) |
| | | |
| | | // 通知主节点变更 |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: firstNode.ClusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "slave2master", |
| | | } |
| | | // 通知主节点变更 |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: firstNode.ClusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "slave2master", |
| | | } |
| | | |
| | | bts, _ := json.Marshal(chMsg) |
| | | h := GetBusHandle() |
| | | if h == nil { |
| | | logger.Error("HandleEventMemberLeave bus handle is nil") |
| | | return |
| | | } |
| | | err := h.Publish("system-service", bts) |
| | | if err != nil { |
| | | logger.Error("HandleEventMemberLeave pub master err:", err) |
| | | } |
| | | bts, _ := json.Marshal(chMsg) |
| | | h := GetBusHandle() |
| | | if h == nil { |
| | | logger.Error("HandleEventMemberLeave bus handle is nil") |
| | | return |
| | | } |
| | | err := h.Publish("system-service", bts) |
| | | if err != nil { |
| | | logger.Error("HandleEventMemberLeave pub master err:", err) |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | logger.Info("EventMemberLeave,current Members:", ev.Members) |
| | | logLT := "" |
| | | logT := time.Now().Format("2006-01-02 15:04:05") |
| | | logSql := strings.ReplaceAll(leaveSql, "'", "''") |
| | |
| | | timeUnix := time.Now().Unix() |
| | | fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") |
| | | |
| | | //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'" |
| | | // 更新在线状态 |
| | | updateOnlineSql := fmt.Sprintf("update cluster_node set online=1 where node_id='%s'", joinMember.Name) |
| | | |
| | | // 如果是新加入的更新创建时间 |
| | | joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s' and isDelete=1", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name) |
| | | flag, e := executeSqlByGorm([]string{updateOnlineSql, joinSql}) |
| | | |
| | | logger.Info("EventMemberJoin,current Members:", ev.Members) |
| | | logLT := "" |
| | | logT := time.Now().Format("2006-01-02 15:04:05") |
| | | logSql := strings.ReplaceAll(joinSql, "'", "''") |
| | | logResult := "0" |
| | | if flag { |
| | | logResult = "1" |
| | | } |
| | | logErr := "" |
| | | if e != nil { |
| | | logErr = e.Error() |
| | | } |
| | | executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"}) |
| | | } |
| | | } |
| | | |
| | | func HandleEventMemberFail(ev serf.MemberEvent) { |
| | | if ev.Members != nil && len(ev.Members) == 1 { |
| | | joinMember := ev.Members[0] |
| | | |
| | | //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'" |
| | | joinSql := fmt.Sprintf("update cluster_node set online=0 where node_id='%s'", joinMember.Name) |
| | | flag, e := executeSqlByGorm([]string{joinSql}) |
| | | |
| | | logger.Info("EventMemberJoin,current Members:", ev.Members) |
| | |
| | | executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"}) |
| | | } |
| | | } |
| | | |
| | | func HandleUpdateMemberStatus() { |
| | | if Agent == nil { |
| | | return |
| | | } |
| | | |
| | | for _, member := range Agent.Serf().Members() { |
| | | alive := 0 |
| | | if member.Status == serf.StatusAlive { |
| | | alive = 1 |
| | | } |
| | | |
| | | sql := fmt.Sprintf("update cluster_node set online=%d where node_id='%s'", alive, member.Name) |
| | | _, err := executeSqlByGorm([]string{sql}) |
| | | if err != nil { |
| | | logger.Error(err) |
| | | } |
| | | } |
| | | } |