| | |
| | | import ( |
| | | "context" |
| | | "encoding/json" |
| | | "fmt" |
| | | "strconv" |
| | | sysSync "sync" |
| | | "time" |
| | |
| | | logger.Debug("AddCluster JoinCluster len(joinIps)=0") |
| | | return false, errors.New("加入的目标ip不能为空") |
| | | } |
| | | |
| | | logger.Debug("AddCluster joinIps:", joinIps) |
| | | joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password |
| | | |
| | |
| | | serf.Agent = agent |
| | | |
| | | t := time.Now() |
| | | syncTableDataFromCluster(joinArg) |
| | | syncClusterNodes := syncTableDataFromCluster(joinArg) |
| | | logger.Debugf("AddCluster time=%v", time.Since(t)) |
| | | //if syncB { |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: joinArg.ClusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "join", |
| | | } |
| | | if syncClusterNodes { |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: joinArg.ClusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "join", |
| | | } |
| | | |
| | | s.AddDbMessage(&chMsg) |
| | | logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start)) |
| | | return true, nil |
| | | //} else { |
| | | // logger.Debug("AddCluster syncTableDataFromCluster fail") |
| | | // if agent != nil { |
| | | // agent.Leave() |
| | | // err = agent.Shutdown() |
| | | // if err != nil { |
| | | // logger.Debug("AddCluster agent shutdown err:", err) |
| | | // } |
| | | // } |
| | | //} |
| | | s.AddDbMessage(&chMsg) |
| | | logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start)) |
| | | return true, nil |
| | | } else { |
| | | logger.Debug("AddCluster syncTableDataFromCluster fail") |
| | | agent.Leave() |
| | | err = agent.Shutdown() |
| | | if err != nil { |
| | | logger.Debug("AddCluster agent shutdown err:", err) |
| | | } |
| | | |
| | | return false, errors.New("加入集群失败") |
| | | } |
| | | } else { |
| | | logger.Debug("AddCluster dbSync.Init err:", err) |
| | | if agent != nil { |
| | |
| | | |
| | | } |
| | | } |
| | | |
| | | logger.Debugf("AddCluster 加入集群失败, targetIp=%v, time=%v", targetIp, time.Since(start)) |
| | | |
| | | return false, errors.New("加入集群失败") |
| | | } |
| | | |
| | |
| | | dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second) |
| | | if dumpSqls != nil && len(*dumpSqls) > 0 { |
| | | for _, sqlStr := range *dumpSqls { |
| | | //logger.Debug("gorm exec dumpSql:", sqlStr) |
| | | logger.Debug("gorm exec dumpSql:", sqlStr) |
| | | if err = tx.Exec(sqlStr).Error; err != nil { |
| | | logger.Error("gorm exec dumpSql:", sqlStr, " error:", err.Error()) |
| | | return false |
| | |
| | | serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter) |
| | | if e1 != nil || serverIp == "" { |
| | | err = errors.New("get serverIp err") |
| | | |
| | | logger.Error("get serverIp err") |
| | | return false |
| | | } |
| | | |
| | | logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName) |
| | | if nodeName == "" { |
| | | nodeName = serverIp |
| | |
| | | logger.Debug("add cur node err:", err) |
| | | return false |
| | | } |
| | | if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil { |
| | | |
| | | joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId) |
| | | if err = tx.Exec(joinSql).Error; err != nil { |
| | | logger.Debug("update isDelete err:", err) |
| | | return false |
| | | } |
| | |
| | | tx.Exec("PRAGMA foreign_keys=ON") |
| | | tx.Commit() |
| | | serf.SyncSql([]string{sql}) |
| | | |
| | | return true |
| | | } |
| | | |