| | |
| | | import ( |
| | | "context" |
| | | "encoding/json" |
| | | "fmt" |
| | | "strconv" |
| | | sysSync "sync" |
| | | "time" |
| | |
| | | arr, err := clusterE.FindAll() |
| | | if err == nil && (arr == nil || len(arr) == 0) { |
| | | |
| | | b := clusterE.Create() |
| | | if b { |
| | | err = clusterE.Create() |
| | | if err == nil { |
| | | serf.InitAgent(context.Background()) |
| | | |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: clusterE.ClusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "", |
| | | Info: "create", |
| | | } |
| | | s.AddDbMessage(&chMsg) |
| | | |
| | | return true, clusterId |
| | | } else { |
| | | logger.Error("初始化集群数据库信息失败. ", err.Error()) |
| | | } |
| | | } else { |
| | | if s.UpdateClusterName(clusterName, virtualIp) { |
| | |
| | | } |
| | | |
| | | return false, "" |
| | | } |
| | | |
| | | // 根据集群名称和密码创建集群 |
| | | func (s ClusterService) UpdateDriftStateByNodeId(clusterId, nodeId, role string) (bool, string) { |
| | | var node models.Node |
| | | isSuccess := node.UpdateDriftStateByNodeId(role, nodeId, false) |
| | | |
| | | if isSuccess { |
| | | // 通知主节点变更 |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: clusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "slave2master", |
| | | } |
| | | |
| | | s.AddDbMessage(&chMsg) |
| | | |
| | | err := serf.Agent.UserEvent(serf.UserEventChangeMaster, []byte(nodeId), false) |
| | | if err != nil { |
| | | logger.Error("UserEventSyncVirtualIp err:", err) |
| | | } |
| | | } |
| | | |
| | | return isSuccess, "" |
| | | } |
| | | |
| | | func (s ClusterService) SearchByPwd(pwd string) (err error) { |
| | |
| | | logger.Debug("AddCluster JoinCluster len(joinIps)=0") |
| | | return false, errors.New("加入的目标ip不能为空") |
| | | } |
| | | |
| | | logger.Debug("AddCluster joinIps:", joinIps) |
| | | joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password |
| | | |
| | |
| | | agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf) |
| | | if err == nil && agent != nil { //加入成功 |
| | | logger.Debug("AddCluster dbSync.Init success") |
| | | agent.RegisterHandleEventFunc(serf.HandleSerfEvent) |
| | | serf.Agent = agent |
| | | |
| | | t := time.Now() |
| | | syncTableDataFromCluster(joinArg) |
| | | syncClusterNodes := syncTableDataFromCluster(joinArg) |
| | | logger.Debugf("AddCluster time=%v", time.Since(t)) |
| | | //if syncB { |
| | | // //需要重新初始化本地比对进程 |
| | | // go serf.ReInitDbPersonCompareData() |
| | | // chMsg := protomsg.DbChangeMessage{ |
| | | // Id: joinArg.ClusterId, |
| | | // Table: protomsg.TableChanged_T_Cluster, |
| | | // Action: protomsg.DbAction_Insert, |
| | | // Info: "", |
| | | // } |
| | | // |
| | | // s.AddDbMessage(&chMsg) |
| | | logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start)) |
| | | return true, nil |
| | | //} else { |
| | | // logger.Debug("AddCluster syncTableDataFromCluster fail") |
| | | // if agent != nil { |
| | | // agent.Leave() |
| | | // err = agent.Shutdown() |
| | | // if err != nil { |
| | | // logger.Debug("AddCluster agent shutdown err:", err) |
| | | // } |
| | | // } |
| | | //} |
| | | if syncClusterNodes { |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: joinArg.ClusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "join", |
| | | } |
| | | |
| | | s.AddDbMessage(&chMsg) |
| | | logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start)) |
| | | return true, nil |
| | | } else { |
| | | logger.Debug("AddCluster syncTableDataFromCluster fail") |
| | | agent.Leave() |
| | | err = agent.Shutdown() |
| | | if err != nil { |
| | | logger.Debug("AddCluster agent shutdown err:", err) |
| | | } |
| | | |
| | | return false, errors.New("加入集群失败") |
| | | } |
| | | } else { |
| | | logger.Debug("AddCluster dbSync.Init err:", err) |
| | | if agent != nil { |
| | |
| | | |
| | | } |
| | | } |
| | | |
| | | logger.Debugf("AddCluster 加入集群失败, targetIp=%v, time=%v", targetIp, time.Since(start)) |
| | | |
| | | return false, errors.New("加入集群失败") |
| | | } |
| | | |
| | |
| | | Id: "", |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Delete, |
| | | Info: "", |
| | | Info: "leave", |
| | | } |
| | | logger.Debugf("Leave delete db time=%v", time.Since(t)) |
| | | tm := time.Now() |
| | |
| | | } |
| | | |
| | | logger.Debugf("Leave success time=%v", time.Since(start)) |
| | | |
| | | return true, nil |
| | | } |
| | | |
| | |
| | | db := models.GetDB() |
| | | db.LogMode(false) |
| | | defer db.LogMode(true) |
| | | |
| | | tx := db.Begin() |
| | | defer func() { |
| | | if err != nil && tx != nil { |
| | |
| | | tx.Exec("PRAGMA foreign_keys=OFF") |
| | | //1.删除本地的同步库数据 |
| | | for _, t := range serf.SyncTables { |
| | | delSql := "" |
| | | if t == "dbtables" { |
| | | delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)" |
| | | } else if t == "dbtablepersons" { |
| | | delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))" |
| | | } else { |
| | | delSql = "delete from " + t + "" |
| | | } |
| | | delSql := "delete from " + t + "" |
| | | |
| | | err = tx.Exec(delSql).Error |
| | | if err != nil { |
| | | logger.Error("删除本地的同步库数据失败,", err.Error()) |
| | | logger.Error("sql:", delSql) |
| | | return false |
| | | } |
| | | } |
| | |
| | | for _, sqlStr := range *dumpSqls { |
| | | logger.Debug("gorm exec dumpSql:", sqlStr) |
| | | if err = tx.Exec(sqlStr).Error; err != nil { |
| | | logger.Error("gorm exec dumpSql:", sqlStr, " error:", err.Error()) |
| | | return false |
| | | } |
| | | } |
| | |
| | | serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter) |
| | | if e1 != nil || serverIp == "" { |
| | | err = errors.New("get serverIp err") |
| | | |
| | | logger.Error("get serverIp err") |
| | | return false |
| | | } |
| | | |
| | | logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName) |
| | | if nodeName == "" { |
| | | nodeName = serverIp |
| | |
| | | logger.Debug("add cur node err:", err) |
| | | return false |
| | | } |
| | | if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil { |
| | | |
| | | joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId) |
| | | if err = tx.Exec(joinSql).Error; err != nil { |
| | | logger.Debug("update isDelete err:", err) |
| | | return false |
| | | } |
| | |
| | | tx.Exec("PRAGMA foreign_keys=ON") |
| | | tx.Commit() |
| | | serf.SyncSql([]string{sql}) |
| | | |
| | | return true |
| | | } |
| | | |
| | |
| | | var lc models.Node |
| | | return lc.FindIpByNode(nodeId) |
| | | } |
| | | |
| | | func ClusterSyncProcMessage(payload []byte) { |
| | | if serf.Agent == nil { |
| | | logger.Error("未加入集群") |
| | | return |
| | | } |
| | | |
| | | err := serf.Agent.UserEvent(serf.UserEventSyncMessage, payload, false) |
| | | if err != nil { |
| | | logger.Error("UserEventSyncMessage err:", err) |
| | | } |
| | | } |