| | |
| | | "password": arr[0].Password, |
| | | "nodes": nodes, |
| | | "virtualIp": arr[0].VirtualIp, |
| | | "localId": config.Server.AnalyServerId, |
| | | }} |
| | | } else { |
| | | return &bhomeclient.Reply{Success: true} |
| | |
| | | } |
| | | } |
| | | |
| | | func (cc ClusterController) Update2Master(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply { |
| | | var nodeVo vo.UpdateClusterVo |
| | | err := c.BindJSON(&nodeVo) |
| | | if err != nil || nodeVo.NodeId == "" { |
| | | return &bhomeclient.Reply{Success: false, Msg: "参数有误"} |
| | | } |
| | | |
| | | sv := service.NewClusterService(h.Bk) |
| | | b, _ := sv.UpdateDriftStateByNodeId(nodeVo.ClusterId, nodeVo.NodeId, "master") |
| | | if b { |
| | | return &bhomeclient.Reply{Success: true, Data: nil} |
| | | } else { |
| | | return &bhomeclient.Reply{Success: false, Msg: "变更失败"} |
| | | } |
| | | } |
| | | |
| | | // @Summary 搜索集群 |
| | | // @Description 搜索集群 |
| | | // @Accept json |
| | |
| | | funcMap[urlPrefix+"/cluster/leave"] = clusterController.Leave |
| | | funcMap[urlPrefix+"/cluster/findIpByNode"] = clusterController.FindIpByNode |
| | | funcMap[urlPrefix+"/cluster/status"] = clusterController.GetClusterStat |
| | | funcMap[urlPrefix+"/cluster/update2Master"] = clusterController.Update2Master |
| | | |
| | | sysMenuC := new(controllers.SysMenuController) |
| | | funcMap["/data/api-u/sysmenus/tree"] = sysMenuC.MenuTree |
| | |
| | | case <-ctx.Done(): |
| | | return |
| | | case msg := <-ms.SubCh: |
| | | logger.Debug("recv sub msg topic:", string(msg.Topic), " data:", string(msg.Data)) |
| | | logger.Debug("recv sub msg topic:", string(msg.Topic), " data len:", len(msg.Data)) |
| | | service.PersistentWrapper(string(msg.Topic), msg.Data) |
| | | } |
| | | } |
| | |
| | | IsDelete bool `gorm:"column:isDelete;default:0" json:"isDelete"` |
| | | DeviceType string `gorm:"column:device_type" json:"device_type"` //设备型号` |
| | | DriftState string `gorm:"column:drift_state" json:"drift_state"` //漂移状态, master,backup |
| | | Online bool `gorm:"column:online;default:1" json:"online"` //在线状态 |
| | | } |
| | | |
| | | func (Node) TableName() string { |
| | |
| | | return node, nil |
| | | } |
| | | |
| | | func (n *Node) UpdateDriftStateByNodeId(driftState string, nodeId string) bool { |
| | | func (n *Node) FindNodeById(id string) (node Node, err error) { |
| | | if err = db.Raw("select * from cluster_node where id=?", id).Scan(&node).Error; err != nil { |
| | | return node, err |
| | | } |
| | | return node, nil |
| | | } |
| | | |
| | | func (n *Node) UpdateDriftStateByNodeId(driftState string, nodeId string, sync bool) bool { |
| | | var err error |
| | | tx := GetDB().Begin() |
| | | if !sync { |
| | | GetDB().LogMode(false) |
| | | defer db.LogMode(true) |
| | | } |
| | | |
| | | defer func() { |
| | | if err != nil && tx != nil { |
| | | logger.Error("updateDriftState err:", err) |
| | |
| | | logger.Info("LTime:", ev.LTime, " Recevie virtualIp change") |
| | | SyncVirtualIpChan <- ev.Payload |
| | | } |
| | | func HandleUserEventChangeMaster(ev serf.UserEvent) { |
| | | masterId := string(ev.Payload) |
| | | localId := Agent.LocalMember().Name |
| | | logger.Info("变为主节点的id,", masterId, "本节点id:", localId) |
| | | if masterId == localId { |
| | | return |
| | | } |
| | | |
| | | var clusterDb models.Node |
| | | localNode, err := clusterDb.FindNodeById(localId) |
| | | if err == nil { |
| | | if localNode.DriftState == "master" { |
| | | logger.Info("本节点为之前的master,通知变更为slave") |
| | | |
| | | // 通知主节点变更 |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: localNode.ClusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "master2slave", |
| | | } |
| | | |
| | | bts, _ := json.Marshal(chMsg) |
| | | h := GetBusHandle() |
| | | if h == nil { |
| | | logger.Error("HandleEventMemberLeave bus handle is nil") |
| | | return |
| | | } |
| | | err := h.Publish("system-service", bts) |
| | | if err != nil { |
| | | logger.Error("HandleEventMemberLeave pub master err:", err) |
| | | } |
| | | } |
| | | } |
| | | |
| | | clusterDb.UpdateDriftStateByNodeId("master", masterId, false) |
| | | } |
| | | |
| | | func HandleUserEventSyncMessage(ev serf.UserEvent) { |
| | | logger.Info("receive a UserEventSyncMessage event") |
| | | var procMsg ProcMessageEvent |
| | |
| | | func HandleEventMemberLeave(ev serf.MemberEvent) { |
| | | if ev.Members != nil && len(ev.Members) == 1 { |
| | | leaveMember := ev.Members[0] |
| | | logger.Info("Event Member Leave, Members:", ev.Members) |
| | | |
| | | leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'" |
| | | flag, e := executeSqlByGorm([]string{leaveSql}) |
| | | |
| | | // 判断离开是否是主节点离开, 更换主节点 |
| | | var clusterDb models.Node |
| | | clusterNodes, err := clusterDb.FindNodes() |
| | | leaveNode, err := clusterDb.FindNodeById(leaveMember.Name) |
| | | if err == nil { |
| | | for _, node := range clusterNodes { |
| | | //logger.Info(node.Id, node.DriftState, leaveMember.Name) |
| | | if node.NodeId == leaveMember.Name && node.DriftState == "master" { |
| | | firstNode, _ := clusterDb.FindFirstNode() |
| | | //logger.Info("firstNode:", firstNode.NodeId, "local:", Agent.LocalMember().Name) |
| | | if firstNode.NodeId == Agent.LocalMember().Name { |
| | | //logger.Info("update master") |
| | | clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId) |
| | | logger.Info("查询离开节点的信息,", leaveNode) |
| | | if leaveNode.DriftState == "master" { |
| | | firstNode, _ := clusterDb.FindFirstNode() |
| | | logger.Info("离开的节点为主节点, 查询加入最早的节点, ", firstNode) |
| | | logger.Info("本节点信息:", Agent.LocalMember().Name) |
| | | if firstNode.NodeId == Agent.LocalMember().Name { |
| | | logger.Info("更新本节点为主节点") |
| | | clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId, true) |
| | | |
| | | // 通知主节点变更 |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: firstNode.ClusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "slave2master", |
| | | } |
| | | // 通知主节点变更 |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: firstNode.ClusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "slave2master", |
| | | } |
| | | |
| | | bts, _ := json.Marshal(chMsg) |
| | | h := GetBusHandle() |
| | | if h == nil { |
| | | logger.Error("HandleEventMemberLeave bus handle is nil") |
| | | return |
| | | } |
| | | err := h.Publish("system-service", bts) |
| | | if err != nil { |
| | | logger.Error("HandleEventMemberLeave pub master err:", err) |
| | | } |
| | | bts, _ := json.Marshal(chMsg) |
| | | h := GetBusHandle() |
| | | if h == nil { |
| | | logger.Error("HandleEventMemberLeave bus handle is nil") |
| | | return |
| | | } |
| | | err := h.Publish("system-service", bts) |
| | | if err != nil { |
| | | logger.Error("HandleEventMemberLeave pub master err:", err) |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | logger.Info("EventMemberLeave,current Members:", ev.Members) |
| | | logLT := "" |
| | | logT := time.Now().Format("2006-01-02 15:04:05") |
| | | logSql := strings.ReplaceAll(leaveSql, "'", "''") |
| | |
| | | timeUnix := time.Now().Unix() |
| | | fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") |
| | | |
| | | //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'" |
| | | // 更新在线状态 |
| | | updateOnlineSql := fmt.Sprintf("update cluster_node set online=1 where node_id='%s'", joinMember.Name) |
| | | |
| | | // 如果是新加入的更新创建时间 |
| | | joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s' and isDelete=1", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name) |
| | | flag, e := executeSqlByGorm([]string{updateOnlineSql, joinSql}) |
| | | |
| | | logger.Info("EventMemberJoin,current Members:", ev.Members) |
| | | logLT := "" |
| | | logT := time.Now().Format("2006-01-02 15:04:05") |
| | | logSql := strings.ReplaceAll(joinSql, "'", "''") |
| | | logResult := "0" |
| | | if flag { |
| | | logResult = "1" |
| | | } |
| | | logErr := "" |
| | | if e != nil { |
| | | logErr = e.Error() |
| | | } |
| | | executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"}) |
| | | } |
| | | } |
| | | |
| | | func HandleEventMemberFail(ev serf.MemberEvent) { |
| | | if ev.Members != nil && len(ev.Members) == 1 { |
| | | joinMember := ev.Members[0] |
| | | |
| | | //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'" |
| | | joinSql := fmt.Sprintf("update cluster_node set online=0 where node_id='%s'", joinMember.Name) |
| | | flag, e := executeSqlByGorm([]string{joinSql}) |
| | | |
| | | logger.Info("EventMemberJoin,current Members:", ev.Members) |
| | |
| | | executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"}) |
| | | } |
| | | } |
| | | |
| | | func HandleUpdateMemberStatus() { |
| | | if Agent == nil { |
| | | return |
| | | } |
| | | |
| | | for _, member := range Agent.Serf().Members() { |
| | | alive := 0 |
| | | if member.Status == serf.StatusAlive { |
| | | alive = 1 |
| | | } |
| | | |
| | | sql := fmt.Sprintf("update cluster_node set online=%d where node_id='%s'", alive, member.Name) |
| | | _, err := executeSqlByGorm([]string{sql}) |
| | | if err != nil { |
| | | logger.Error(err) |
| | | } |
| | | } |
| | | } |
| | |
| | | UserEventSyncSql = "SyncSql" |
| | | UserEventSyncDbTablePersonCache = "SyncCache" |
| | | UserEventSyncVirtualIp = "SyncVirtualIp" //漂移ip修改 |
| | | UserEventChangeMaster = "ChangeMaster " //修改节点状态 |
| | | UserEventSyncRegisterInfo = "SyncRegisterInfo" //同步注册信息 |
| | | DataSystemSerfSubscribe = "data-system-serf-subscribe" //各app从serf订阅消息 |
| | | UserEventSyncMessage = "SyncMessageForProc" // 为其他进程同步消息 |
| | |
| | | HandleUserEventSyncDbTablePersonCache(ev) |
| | | } else if ev.Name == UserEventSyncVirtualIp { |
| | | HandleUserEventSyncVirtualIp(ev) |
| | | } else if ev.Name == UserEventChangeMaster { |
| | | HandleUserEventChangeMaster(ev) |
| | | } else if ev.Name == UserEventSyncRegisterInfo { |
| | | HandleSyncRegisterInfo(ev) |
| | | } else if ev.Name == DataSystemSerfSubscribe { |
| | |
| | | case serf.MemberEvent: |
| | | if event.EventType() == serf.EventMemberLeave { |
| | | HandleEventMemberLeave(ev) |
| | | } else if event.EventType() == serf.EventMemberJoin { |
| | | } else if event.EventType() == serf.EventMemberJoin || event.EventType() == serf.EventMemberUpdate { |
| | | HandleEventMemberJoin(ev) |
| | | } else if event.EventType() == serf.EventMemberFailed { |
| | | HandleEventMemberFail(ev) |
| | | } |
| | | logger.Error("serf MemberEvent ", event.EventType()) |
| | | default: |
| | |
| | | logger.Error("ExecuteSqlByGorm err:", err, ",sql:", sql) |
| | | return false, err |
| | | } |
| | | if result.RowsAffected == 0 { |
| | | logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql) |
| | | err = errors.New("ExecuteSqlByGorm RowsAffected == 0") |
| | | return false, err |
| | | } |
| | | //if result.RowsAffected == 0 { |
| | | // logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql) |
| | | // err = errors.New("ExecuteSqlByGorm RowsAffected == 0") |
| | | // return false, err |
| | | //} |
| | | } |
| | | tx.Commit() |
| | | return true, nil |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | go func() { |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | HandleUpdateMemberStatus() |
| | | time.Sleep(5 * time.Second) |
| | | } |
| | | } |
| | | }() |
| | | |
| | | go func() { |
| | | for { |
| | | select { |
| | |
| | | return false, "" |
| | | } |
| | | |
| | | // 根据集群名称和密码创建集群 |
| | | func (s ClusterService) UpdateDriftStateByNodeId(clusterId, nodeId, role string) (bool, string) { |
| | | var node models.Node |
| | | isSuccess := node.UpdateDriftStateByNodeId(role, nodeId, false) |
| | | |
| | | if isSuccess { |
| | | // 通知主节点变更 |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: clusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "slave2master", |
| | | } |
| | | |
| | | s.AddDbMessage(&chMsg) |
| | | |
| | | err := serf.Agent.UserEvent(serf.UserEventChangeMaster, []byte(nodeId), false) |
| | | if err != nil { |
| | | logger.Error("UserEventSyncVirtualIp err:", err) |
| | | } |
| | | } |
| | | |
| | | return isSuccess, "" |
| | | } |
| | | |
| | | func (s ClusterService) SearchByPwd(pwd string) (err error) { |
| | | _, isSearching := getFromSearchMap() |
| | | if isSearching { |
| | |
| | | func ClusterSyncProcMessage(payload []byte) { |
| | | if serf.Agent == nil { |
| | | logger.Error("未加入集群") |
| | | return |
| | | } |
| | | |
| | | err := serf.Agent.UserEvent(serf.UserEventSyncMessage, payload, false) |
| | |
| | | package vo |
| | | |
| | | import "vamicro/system-service/models" |
| | | |
| | | type ClusterVo struct { |
| | | ClusterInfo models.Cluster `json:"clusterInfo"` |
| | | Nodes []models.Node `json:"nodes"` |
| | | Nodes []models.Node `json:"nodes"` |
| | | } |
| | | |
| | | type ClusterCreateVo struct { |
| | | Password string `json:"password"` |
| | | Password string `json:"password"` |
| | | ClusterName string `json:"clusterName"` |
| | | ClusterId string `json:"clusterId"` |
| | | VirtualIp string `json:"virtualIp"` |
| | | ClusterId string `json:"clusterId"` |
| | | VirtualIp string `json:"virtualIp"` |
| | | } |
| | | |
| | | type ClusterSearchVo struct { |
| | | Password string `json:"password"` |
| | | } |
| | | |
| | | type ClusterJoinVo struct { |
| | | type UpdateClusterVo struct { |
| | | ClusterId string `json:"clusterId"` |
| | | Password string `json:"password"` |
| | | NodeIps []string `json:"nodeIps"` |
| | | NodeId string `json:"nodeId"` |
| | | } |
| | | |
| | | type ClusterJoinVo struct { |
| | | ClusterId string `json:"clusterId"` |
| | | Password string `json:"password"` |
| | | NodeIps []string `json:"nodeIps"` |
| | | } |