| | |
| | | return m |
| | | } |
| | | |
| | | //根据集群名称和密码创建集群 |
| | | // 根据集群名称和密码创建集群 |
| | | func (s ClusterService) Create(clusterName string, pwd string, virtualIp string) (bool, string) { |
| | | devTyp := GetDevType(config.Server.DeviceType) |
| | | if devTyp != DevType_Storage && devTyp != DevType_Analysis_Storage { |
| | | return false, "只有分析和存储节点才能创建集群" |
| | | } |
| | | clusterId := uuid.NewV4().String() |
| | | pwd = config.ClusterSet.PwdPre + pwd |
| | | var clusterE = models.Cluster{ |
| | |
| | | b := clusterE.Create() |
| | | if b { |
| | | serf.InitAgent(context.Background()) |
| | | |
| | | //创建es集群 |
| | | if _, err := CreateOriginalCluster("/opt/elasticsearch/config", "/opt/vasystem/script", "/opt/vasystem/indexInit"); err != nil { |
| | | //创建失败,则serf集群也要推出 |
| | | s.Leave(false) |
| | | return false, err.Error() |
| | | } |
| | | |
| | | //创建weedfs集群 |
| | | if _, err = CreateWeedfsServer("/opt/vasystem/script"); err != nil { |
| | | s.Leave(false) |
| | | return false, err.Error() |
| | | } |
| | | |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: clusterE.ClusterId, |
| | |
| | | } |
| | | } |
| | | |
| | | //加入集群 |
| | | // 加入集群 |
| | | func (s ClusterService) JoinCluster(joinArg *vo.ClusterJoinVo) (bool, error) { |
| | | start := time.Now() |
| | | if config.Server.AnalyServerId == "" { |
| | |
| | | serf.Agent = agent |
| | | |
| | | t := time.Now() |
| | | syncB := syncTableDataFromCluster(joinArg) |
| | | logger.Debugf("AddCluster syncTableDataFromCluster time=%v, syncB=%v", time.Since(t), syncB) |
| | | if syncB { |
| | | devTyp := GetDevType(config.Server.DeviceType) |
| | | if devTyp == DevType_Analysis_Storage || devTyp == DevType_Storage { //只有能启动ES的节点才能加入ES |
| | | go func() { |
| | | //添加ES节点 |
| | | if _, err := AddCluster("/opt/elasticsearch/config", "/opt/vasystem/script", targetIp, "9200", config.EsConfig.StorePath); err != nil { |
| | | s.Leave(false) |
| | | return |
| | | } |
| | | |
| | | //添加weedfs节点 |
| | | if _, err := AddWeedfsServer("/opt/vasystem/script", targetIp, config.StorageConf.VolumePath); err != nil { |
| | | s.Leave(false) |
| | | return |
| | | } |
| | | }() |
| | | } |
| | | |
| | | //需要重新初始化本地比对进程 |
| | | go serf.ReInitDbPersonCompareData() |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: joinArg.ClusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "", |
| | | } |
| | | s.AddDbMessage(&chMsg) |
| | | logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start)) |
| | | return true, nil |
| | | } else { |
| | | logger.Debug("AddCluster syncTableDataFromCluster fail") |
| | | if agent != nil { |
| | | agent.Leave() |
| | | err = agent.Shutdown() |
| | | if err != nil { |
| | | logger.Debug("AddCluster agent shutdown err:", err) |
| | | } |
| | | } |
| | | } |
| | | syncTableDataFromCluster(joinArg) |
| | | logger.Debugf("AddCluster time=%v", time.Since(t)) |
| | | //if syncB { |
| | | // //需要重新初始化本地比对进程 |
| | | // go serf.ReInitDbPersonCompareData() |
| | | // chMsg := protomsg.DbChangeMessage{ |
| | | // Id: joinArg.ClusterId, |
| | | // Table: protomsg.TableChanged_T_Cluster, |
| | | // Action: protomsg.DbAction_Insert, |
| | | // Info: "", |
| | | // } |
| | | // |
| | | // s.AddDbMessage(&chMsg) |
| | | logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start)) |
| | | return true, nil |
| | | //} else { |
| | | // logger.Debug("AddCluster syncTableDataFromCluster fail") |
| | | // if agent != nil { |
| | | // agent.Leave() |
| | | // err = agent.Shutdown() |
| | | // if err != nil { |
| | | // logger.Debug("AddCluster agent shutdown err:", err) |
| | | // } |
| | | // } |
| | | //} |
| | | } else { |
| | | logger.Debug("AddCluster dbSync.Init err:", err) |
| | | if agent != nil { |
| | |
| | | DevType_Other = "other" //其他设备类型,eg:应用 |
| | | ) |
| | | |
| | | //获取本机类型,只进行分析、分析存储一体、只存储、其他。。 |
| | | // 获取本机类型,只进行分析、分析存储一体、只存储、其他。。 |
| | | func GetDevType(dt string) string { |
| | | if dt != "" { |
| | | if len(dt) >= 4 { |
| | |
| | | |
| | | func (s ClusterService) Leave(isDel bool) (bool, error) { |
| | | start := time.Now() |
| | | devType := GetDevType(config.Server.DeviceType) |
| | | logger.Debugf("Leave GetDevType=%v, isDel=%v", devType, isDel) |
| | | |
| | | if devType == DevType_Analysis_Storage || devType == DevType_Storage { //只有分析和存储才有ES集群 |
| | | go func() { |
| | | //退出weedfs节点 |
| | | if _, err := ExitWeedfsServer("/opt/vasystem/script", isDel, config.StorageConf.VolumePath); err != nil { |
| | | logger.Error("Leave ExitWeedfsServer err:", err) |
| | | return |
| | | } |
| | | //退出es节点 |
| | | if _, err := ExitCluster("/opt/elasticsearch/config", "/opt/vasystem/script"); err != nil { |
| | | logger.Error("Leave ExitCluster es cluster err:", err) |
| | | return |
| | | } |
| | | }() |
| | | } |
| | | |
| | | var err error |
| | | tx := models.GetDB().Begin() |
| | |
| | | return false |
| | | } |
| | | |
| | | //加入集群后清空本地的同步库数据 |
| | | // 加入集群后清空本地的同步库数据 |
| | | func syncTableDataFromCluster(joinArg *vo.ClusterJoinVo) bool { |
| | | var lc models.LocalConfig |
| | | e := lc.Select() |
| | |
| | | tx.Rollback() |
| | | } |
| | | }() |
| | | |
| | | //0.关闭reference |
| | | tx.Exec("PRAGMA foreign_keys=OFF") |
| | | //1.删除本地的同步库数据 |
| | |
| | | return false |
| | | } |
| | | } |
| | | |
| | | //2.拉取集群内的同步库数据到本地数据库表中 |
| | | var dumpSqls *[]string |
| | | dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second) |