| | |
| | | import ( |
| | | "context" |
| | | "encoding/json" |
| | | "fmt" |
| | | "strconv" |
| | | sysSync "sync" |
| | | "time" |
| | |
| | | return m |
| | | } |
| | | |
| | | //根据集群名称和密码创建集群 |
| | | // 根据集群名称和密码创建集群 |
| | | func (s ClusterService) Create(clusterName string, pwd string, virtualIp string) (bool, string) { |
| | | devTyp := GetDevType(config.Server.DeviceType) |
| | | if devTyp != DevType_Storage && devTyp != DevType_Analysis_Storage { |
| | | return false, "只有分析和存储节点才能创建集群" |
| | | } |
| | | clusterId := uuid.NewV4().String() |
| | | pwd = config.ClusterSet.PwdPre + pwd |
| | | var clusterE = models.Cluster{ |
| | |
| | | arr, err := clusterE.FindAll() |
| | | if err == nil && (arr == nil || len(arr) == 0) { |
| | | |
| | | b := clusterE.Create() |
| | | if b { |
| | | err = clusterE.Create() |
| | | if err == nil { |
| | | serf.InitAgent(context.Background()) |
| | | |
| | | //创建es集群 |
| | | if _, err := CreateOriginalCluster("/opt/elasticsearch/config", "/opt/vasystem/script", "/opt/vasystem/indexInit"); err != nil { |
| | | //创建失败,则serf集群也要推出 |
| | | s.Leave(false) |
| | | return false, err.Error() |
| | | } |
| | | |
| | | //创建weedfs集群 |
| | | if _, err = CreateWeedfsServer("/opt/vasystem/script"); err != nil { |
| | | s.Leave(false) |
| | | return false, err.Error() |
| | | } |
| | | |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: clusterE.ClusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "", |
| | | Info: "create", |
| | | } |
| | | s.AddDbMessage(&chMsg) |
| | | |
| | | return true, clusterId |
| | | } else { |
| | | logger.Error("初始化集群数据库信息失败. ", err.Error()) |
| | | } |
| | | } else { |
| | | if s.UpdateClusterName(clusterName, virtualIp) { |
| | |
| | | } |
| | | |
| | | return false, "" |
| | | } |
| | | |
| | | // 根据集群名称和密码创建集群 |
| | | func (s ClusterService) UpdateDriftStateByNodeId(clusterId, nodeId, role string) (bool, string) { |
| | | var node models.Node |
| | | isSuccess := node.UpdateDriftStateByNodeId(role, nodeId, false) |
| | | |
| | | if isSuccess { |
| | | // 通知主节点变更 |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: clusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "slave2master", |
| | | } |
| | | |
| | | s.AddDbMessage(&chMsg) |
| | | |
| | | err := serf.Agent.UserEvent(serf.UserEventChangeMaster, []byte(nodeId), false) |
| | | if err != nil { |
| | | logger.Error("UserEventSyncVirtualIp err:", err) |
| | | } |
| | | } |
| | | |
| | | return isSuccess, "" |
| | | } |
| | | |
| | | func (s ClusterService) SearchByPwd(pwd string) (err error) { |
| | |
| | | } |
| | | } |
| | | |
| | | //加入集群 |
| | | // 加入集群 |
| | | func (s ClusterService) JoinCluster(joinArg *vo.ClusterJoinVo) (bool, error) { |
| | | start := time.Now() |
| | | if config.Server.AnalyServerId == "" { |
| | |
| | | logger.Debug("AddCluster JoinCluster len(joinIps)=0") |
| | | return false, errors.New("加入的目标ip不能为空") |
| | | } |
| | | |
| | | logger.Debug("AddCluster joinIps:", joinIps) |
| | | joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password |
| | | |
| | |
| | | agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf) |
| | | if err == nil && agent != nil { //加入成功 |
| | | logger.Debug("AddCluster dbSync.Init success") |
| | | agent.RegisterHandleEventFunc(serf.HandleSerfEvent) |
| | | serf.Agent = agent |
| | | |
| | | t := time.Now() |
| | | syncB := syncTableDataFromCluster(joinArg) |
| | | logger.Debugf("AddCluster syncTableDataFromCluster time=%v, syncB=%v", time.Since(t), syncB) |
| | | if syncB { |
| | | devTyp := GetDevType(config.Server.DeviceType) |
| | | if devTyp == DevType_Analysis_Storage || devTyp == DevType_Storage { //只有能启动ES的节点才能加入ES |
| | | go func() { |
| | | //添加ES节点 |
| | | if _, err := AddCluster("/opt/elasticsearch/config", "/opt/vasystem/script", targetIp, "9200", config.EsConfig.StorePath); err != nil { |
| | | s.Leave(false) |
| | | return |
| | | } |
| | | |
| | | //添加weedfs节点 |
| | | if _, err := AddWeedfsServer("/opt/vasystem/script", targetIp, config.StorageConf.VolumePath); err != nil { |
| | | s.Leave(false) |
| | | return |
| | | } |
| | | }() |
| | | } |
| | | |
| | | //需要重新初始化本地比对进程 |
| | | go serf.ReInitDbPersonCompareData() |
| | | syncClusterNodes := syncTableDataFromCluster(joinArg) |
| | | logger.Debugf("AddCluster time=%v", time.Since(t)) |
| | | if syncClusterNodes { |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: joinArg.ClusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "", |
| | | Info: "join", |
| | | } |
| | | |
| | | s.AddDbMessage(&chMsg) |
| | | logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start)) |
| | | return true, nil |
| | | } else { |
| | | logger.Debug("AddCluster syncTableDataFromCluster fail") |
| | | if agent != nil { |
| | | agent.Leave() |
| | | err = agent.Shutdown() |
| | | if err != nil { |
| | | logger.Debug("AddCluster agent shutdown err:", err) |
| | | } |
| | | agent.Leave() |
| | | err = agent.Shutdown() |
| | | if err != nil { |
| | | logger.Debug("AddCluster agent shutdown err:", err) |
| | | } |
| | | |
| | | return false, errors.New("加入集群失败") |
| | | } |
| | | } else { |
| | | logger.Debug("AddCluster dbSync.Init err:", err) |
| | |
| | | |
| | | } |
| | | } |
| | | |
| | | logger.Debugf("AddCluster 加入集群失败, targetIp=%v, time=%v", targetIp, time.Since(start)) |
| | | |
| | | return false, errors.New("加入集群失败") |
| | | } |
| | | |
| | |
| | | DevType_Other = "other" //其他设备类型,eg:应用 |
| | | ) |
| | | |
| | | //获取本机类型,只进行分析、分析存储一体、只存储、其他。。 |
| | | // 获取本机类型,只进行分析、分析存储一体、只存储、其他。。 |
| | | func GetDevType(dt string) string { |
| | | if dt != "" { |
| | | if len(dt) >= 4 { |
| | |
| | | |
| | | func (s ClusterService) Leave(isDel bool) (bool, error) { |
| | | start := time.Now() |
| | | devType := GetDevType(config.Server.DeviceType) |
| | | logger.Debugf("Leave GetDevType=%v, isDel=%v", devType, isDel) |
| | | |
| | | if devType == DevType_Analysis_Storage || devType == DevType_Storage { //只有分析和存储才有ES集群 |
| | | go func() { |
| | | //退出weedfs节点 |
| | | if _, err := ExitWeedfsServer("/opt/vasystem/script", isDel, config.StorageConf.VolumePath); err != nil { |
| | | logger.Error("Leave ExitWeedfsServer err:", err) |
| | | return |
| | | } |
| | | //退出es节点 |
| | | if _, err := ExitCluster("/opt/elasticsearch/config", "/opt/vasystem/script"); err != nil { |
| | | logger.Error("Leave ExitCluster es cluster err:", err) |
| | | return |
| | | } |
| | | }() |
| | | } |
| | | |
| | | var err error |
| | | tx := models.GetDB().Begin() |
| | |
| | | Id: "", |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Delete, |
| | | Info: "", |
| | | Info: "leave", |
| | | } |
| | | logger.Debugf("Leave delete db time=%v", time.Since(t)) |
| | | tm := time.Now() |
| | |
| | | } |
| | | |
| | | logger.Debugf("Leave success time=%v", time.Since(start)) |
| | | |
| | | return true, nil |
| | | } |
| | | |
| | |
| | | return false |
| | | } |
| | | |
| | | //加入集群后清空本地的同步库数据 |
| | | // 加入集群后清空本地的同步库数据 |
| | | func syncTableDataFromCluster(joinArg *vo.ClusterJoinVo) bool { |
| | | var lc models.LocalConfig |
| | | e := lc.Select() |
| | |
| | | db := models.GetDB() |
| | | db.LogMode(false) |
| | | defer db.LogMode(true) |
| | | |
| | | tx := db.Begin() |
| | | defer func() { |
| | | if err != nil && tx != nil { |
| | | tx.Rollback() |
| | | } |
| | | }() |
| | | |
| | | //0.关闭reference |
| | | tx.Exec("PRAGMA foreign_keys=OFF") |
| | | //1.删除本地的同步库数据 |
| | | for _, t := range serf.SyncTables { |
| | | delSql := "" |
| | | if t == "dbtables" { |
| | | delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)" |
| | | } else if t == "dbtablepersons" { |
| | | delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))" |
| | | } else { |
| | | delSql = "delete from " + t + "" |
| | | } |
| | | delSql := "delete from " + t + "" |
| | | |
| | | err = tx.Exec(delSql).Error |
| | | if err != nil { |
| | | logger.Error("删除本地的同步库数据失败,", err.Error()) |
| | | logger.Error("sql:", delSql) |
| | | return false |
| | | } |
| | | } |
| | | |
| | | //2.拉取集群内的同步库数据到本地数据库表中 |
| | | var dumpSqls *[]string |
| | | dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second) |
| | |
| | | for _, sqlStr := range *dumpSqls { |
| | | logger.Debug("gorm exec dumpSql:", sqlStr) |
| | | if err = tx.Exec(sqlStr).Error; err != nil { |
| | | logger.Error("gorm exec dumpSql:", sqlStr, " error:", err.Error()) |
| | | return false |
| | | } |
| | | } |
| | |
| | | serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter) |
| | | if e1 != nil || serverIp == "" { |
| | | err = errors.New("get serverIp err") |
| | | |
| | | logger.Error("get serverIp err") |
| | | return false |
| | | } |
| | | |
| | | logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName) |
| | | if nodeName == "" { |
| | | nodeName = serverIp |
| | |
| | | logger.Debug("add cur node err:", err) |
| | | return false |
| | | } |
| | | if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil { |
| | | |
| | | joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId) |
| | | if err = tx.Exec(joinSql).Error; err != nil { |
| | | logger.Debug("update isDelete err:", err) |
| | | return false |
| | | } |
| | |
| | | tx.Exec("PRAGMA foreign_keys=ON") |
| | | tx.Commit() |
| | | serf.SyncSql([]string{sql}) |
| | | |
| | | return true |
| | | } |
| | | |
| | |
| | | var lc models.Node |
| | | return lc.FindIpByNode(nodeId) |
| | | } |
| | | |
| | | func ClusterSyncProcMessage(payload []byte) { |
| | | if serf.Agent == nil { |
| | | logger.Error("未加入集群") |
| | | return |
| | | } |
| | | |
| | | err := serf.Agent.UserEvent(serf.UserEventSyncMessage, payload, false) |
| | | if err != nil { |
| | | logger.Error("UserEventSyncMessage err:", err) |
| | | } |
| | | } |