package controllers import ( "fmt" "github.com/gin-gonic/gin" "net/http" "strconv" "strings" "swfs/code" "swfs/config" "swfs/util" "sync" "time" ) const ( StartServerScript = "seaweedfs_start.sh" StopServerScript = "seaweedfs_stop.sh" StartScriptAsVolume = "option=1" StartScriptAsMaster = "option=2" ) type SeaweedfsController struct{} type SWFSInfo struct { Role string `json:"role"` } func (sc *SeaweedfsController) UpdateSWFSServiceController(c *gin.Context) { oldPeers := GetOldPeers(config.Server.ScriptPath, StartServerScript) newPeers := GetNewPeers() UpdatePeers(config.Server.ScriptPath, StartServerScript, oldPeers, newPeers) ReplaceLineContentBySearch(StartScriptAsVolume, config.Server.ScriptPath, StartServerScript) } // @Security ApiKeyAuth // @Summary 新节点加入 // @Description 新节点加入 // @Accept json // @Produce json // @Tags swfs // @Param obj body SWFSInfo true "加入角色参数" // @Success 200 {string} json "{"code":200, msg:"", success:true}" // @Failure 500 {string} json "{"code":500, msg:"", success:false}" // @Router /node/api-v/swfs/addSWFSNode [POST] func (sc *SeaweedfsController) AddSWFSNodeController(c *gin.Context) { var body SWFSInfo c.BindJSON(&body) role := body.Role if role == "master" { AddNewMasterToPeers() AsMaster() util.ResponseFormat(c, code.AddSuccess, "加入节点成功") return } else if role == "volume" { AsVolume() util.ResponseFormat(c, code.AddSuccess, "加入节点成功") return } else { util.ResponseFormat(c, code.RequestParamError, "选择节点类型错误") return } } func (sc *SeaweedfsController) RoleOfVolumeToMasterController(c *gin.Context) { AsMaster() } func (sc *SeaweedfsController) RestartMasterController(c *gin.Context) { StopServer(config.Server.ScriptPath) time.Sleep(time.Second * 1) StartServer(config.Server.ScriptPath) } //启动服务 func StartServer(scriptPath string) { util.RunScript("sh " + scriptPath + StartServerScript) } //停止服务 func StopServer(scriptPath string) { util.RunScript("sh " + scriptPath + StopServerScript) } //根据搜索内容替换整行内容 func ReplaceLineContentBySearch(replaceContent string, scriptPath string, scriptFile string) { resContent := GetNowLineContent(scriptPath+"/"+scriptFile, "option=") replaceStr := "sed -ie 's/" + resContent + "/" + replaceContent + "/g' " + scriptPath + "/" + scriptFile util.RunScript(replaceStr) } //更新所有节点的脚本参数 func UpdateAllNodesScriptArgument(nowPeers []interface{}) { for _, val := range nowPeers { ip := val.(string) url := "http://" + ip + ":7020/node/api-v/swfs/updateSWFSService" http.Get(url) } } //请求作为主节点操作流程 func RequestMasterNodesOperation(nowPeers []interface{}) { //fmt.Println("config.Server.CoreBaseUnit", config.Server.CoreBaseUnit) coreBaseUnit, _ := strconv.Atoi(config.Server.CoreBaseUnit) //fmt.Println("nowPeers: ", nowPeers) //fmt.Println("coreBaseUnit: ", coreBaseUnit) UpdateAllNodesScriptArgument(nowPeers) RestartAllServer(nowPeers, coreBaseUnit) } //重启其他节点服务并验证 func RestartServer(ip string, timeOut int) { url := "http://" + ip + ":7020/node/api-v/swfs/restartMaster" http.Get(url) tick := time.Tick(1 * time.Second) for countdown := timeOut; countdown > 0; countdown-- { verificationMasterUrl := "http://" + ip + ":6333" verificationVolume1Url := "http://" + ip + ":6700" verificationVolume2Url := "http://" + ip + ":6701" _, masterErr := http.Get(verificationMasterUrl) _, volume1Err := http.Get(verificationVolume1Url) _, volume2Err := http.Get(verificationVolume2Url) if masterErr == nil && volume1Err == nil && volume2Err == nil { break } <-tick } } //构建重启流程 func RestartAllServer(nowPeers []interface{}, coreBaseUnit int) { coreThread := len(nowPeers)/coreBaseUnit + 1 masterIp := make([]string, 0) timeOut, _ := strconv.Atoi(config.Server.TimeOut) var waitGroup sync.WaitGroup for i, val := range nowPeers { ip := val.(string) if (i+1)%coreThread == 0 { masterIp = append(masterIp, strings.Split(ip, ":")[0]) for i := 0; i < len(masterIp); i++ { go RestartServer(masterIp[i], timeOut) waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1 } waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞 masterIp = make([]string, 0) } else { masterIp = append(masterIp, strings.Split(ip, ":")[0]) if len(nowPeers) == i+1 { var waitGroup sync.WaitGroup for i := 0; i < len(masterIp); i++ { go RestartServer(masterIp[i], timeOut) waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1 } waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞 masterIp = make([]string, 0) break } } continue } } //获取本地以使用集群表单 func GetOldPeers(scriptPath string, scriptFile string) string { return GetNowLineContent(scriptPath+"/"+scriptFile, "peers=") } //获取查找内容当前行内容 func GetNowLineContent(filePath string, searchContent string) string { scriptStr := "cat" + filePath + "| grep " + searchContent return strings.Split(util.RunScript(scriptStr), "\n")[0] } //作为数据节点加入 func AsVolume() { ReplaceLineContentBySearch(StartScriptAsVolume, config.Server.ScriptPath, StartServerScript) StartServer(config.Server.ScriptPath) } //作为主节点加入(默认包含数据节点) func AsMaster() { nowPeers := GetNowPeersList() RequestMasterNodesOperation(nowPeers) } //获取当前集群列表 func GetNowPeersList() []interface{} { getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search" getJson := `{ "query": { "bool": { "filter": [ { "term": { "application":"nodeOperation" } } ] } }, "size": 1 }` buf, _ := util.EsReq("POST", getUrl, []byte(getJson)) source, _ := util.Sourcelist(buf) peers := source[0]["peers"].([]interface{}) return peers } //获取当前集群列表格式化信息 func GetNewPeers() string { peers := GetNowPeersList() p := "peers=" + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", ",", -1) return p } //更新本地集群列表 func UpdatePeers(scriptPath string, scriptFile string, oldPeers string, newPeers string) { str := "sed -ie 's/" + oldPeers + "/" + newPeers + "/g' " + scriptPath + "/" + scriptFile util.RunScript(str) } //向集群加入新的master func AddNewMasterToPeers() (result bool) { peer := config.Server.EsServerIp + ":6333" addUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_update_by_query" addJson := `{ "script": { "lang": "painless", "inline": "ctx._source.peers.add(params.newpeer)", "params": { "newpeer": "` + peer + `" } }, "query": { "bool": { "filter": [ { "term": { "application": "nodeOperation" } } ] } } }` buf, _ := util.EsReq("POST", addUrl, []byte(addJson)) updateRes, _ := util.SourceUpdated(buf) if updateRes == -1 { result = false } else { result = true } return result }