package controllers import ( "fmt" "github.com/gin-gonic/gin" "io/ioutil" "net/http" "strconv" "strings" "swfs/code" "swfs/config" "swfs/util" "sync" "time" ) const ( StartServerScript = "seaweedfs_start.sh" StopServerScript = "seaweedfs_stop.sh" //作为 volume 启动 StartScriptAsVolume = "1" //作为 master 启动 StartScriptAsMaster = "2" //作为 master + volume 启动 StartScriptAsMaVo = "3" //启动项参数头 Option = "option=" //master列表项参数头 Peer = "peers=" //StartScriptAsFiler = "option=4" ) type SeaweedfsController struct{} type SWFSInfo struct { Role string `json:"role"` } func (sc *SeaweedfsController) UpdateSWFSServiceController(c *gin.Context) { ReplaceLineContentBySearch(GetNewPeers(), Peer, config.Server.ScriptPath, StartServerScript) util.ResponseFormat(c, code.Success, config.Server.EsServerIp+"更新成功") } // @Security ApiKeyAuth // @Summary 新节点加入 // @Description 新节点加入 // @Accept json // @Produce json // @Tags swfs // @Param obj body SWFSInfo true "加入角色参数" // @Success 200 {string} json "{"code":200, msg:"", success:true}" // @Failure 500 {string} json "{"code":500, msg:"", success:false}" // @Router /node/api-v/swfs/addSWFSNode [POST] func (sc *SeaweedfsController) AddSWFSNodeController(c *gin.Context) { var body SWFSInfo c.BindJSON(&body) role := body.Role if role == "master" { AsMaster() util.ResponseFormat(c, code.AddSuccess, "加入节点成功") return } else if role == "volume" { status := AsVolume() if status == true { util.ResponseFormat(c, code.AddSuccess, "加入节点成功") return } else { util.ResponseFormat(c, code.AddClusterInfoErr, "当前还没有主节点") return } } else if role == "master+volume" { AsMaVo() util.ResponseFormat(c, code.AddSuccess, "加入节点成功") } else { util.ResponseFormat(c, code.RequestParamError, "选择节点类型错误") return } } func AsMaVo() { AsMaster() ReplaceLineContentBySearch(StartScriptAsMaVo, Option, config.Server.ScriptPath, StartServerScript) } func (sc *SeaweedfsController) RoleOfVolumeToMasterController(c *gin.Context) { AsMaster() ReplaceLineContentBySearch(StartScriptAsMaVo, Option, config.Server.ScriptPath, StartServerScript) } func (sc *SeaweedfsController) RestartServerController(c *gin.Context) { StopServer(config.Server.ScriptPath) time.Sleep(time.Second * 1) StartServer(config.Server.ScriptPath) result := strings.Split("=", GetLocalStartupItem(config.Server.ScriptPath, StartServerScript))[1] util.ResponseFormat(c, code.Success, result) } //启动服务 func StartServer(scriptPath string) { util.RunScript("sh " + scriptPath + StartServerScript) } //停止服务 func StopServer(scriptPath string) { util.RunScript("sh " + scriptPath + StopServerScript) } //根据搜索内容替换整行内容 func ReplaceLineContentBySearch(replaceContent string, searchContent string, scriptPath string, scriptFile string) { resSearchContent := GetNowLineContent(scriptPath+"/"+scriptFile, searchContent) replaceStr := "sed -ie 's/" + resSearchContent + "/" + replaceContent + "/g' " + scriptPath + "/" + scriptFile util.RunScript(replaceStr) } //更新所有节点的脚本参数 func UpdateAllNodesScriptArgument(nowPeers []interface{}) { for _, val := range nowPeers { ip := val.(string) url := "http://" + ip + ":7020/node/api-v/swfs/updateSWFSService" http.Get(url) } } //请求作为主节点操作流程 func RequestMasterNodesOperation(nowPeers []interface{}) { coreBaseUnit, _ := strconv.Atoi(config.Server.CoreBaseUnit) UpdateAllNodesScriptArgument(nowPeers) RestartAllServer(nowPeers, coreBaseUnit) } //重启所有节点服务并验证 func Restart(ip string, timeOut int) { url := "http://" + ip + ":7020/node/api-v/swfs/restartServer" httpRes, _ := http.Get(url) defer httpRes.Body.Close() body, _ := ioutil.ReadAll(httpRes.Body) startupItem := "" if httpRes.StatusCode == 200 { startupItem = string(body) } tick := time.Tick(1 * time.Second) for countdown := timeOut; countdown > 0; countdown-- { Verification(startupItem, ip) <-tick } } func Verification(startupItem string, ip string) { switch startupItem { case StartScriptAsVolume: verificationVolumeUrl := "http://" + ip + ":6700" _, volume1Err := http.Get(verificationVolumeUrl) if volume1Err == nil { break } case StartScriptAsMaster: verificationMasterUrl := "http://" + ip + ":6333" _, masterErr := http.Get(verificationMasterUrl) if masterErr == nil { break } case StartScriptAsMaVo: verificationMasterUrl := "http://" + ip + ":6333" verificationVolumeUrl := "http://" + ip + ":6700" _, masterErr := http.Get(verificationMasterUrl) _, volume1Err := http.Get(verificationVolumeUrl) if masterErr == nil && volume1Err == nil { break } } } //获取本地启动项 func GetLocalStartupItem(scriptPath string, scriptFile string) string { startupItem := GetNowLineContent(scriptPath+"/"+scriptFile, Option) fmt.Println(startupItem) return startupItem } //构建重启流程 func RestartAllServer(nowPeers []interface{}, coreBaseUnit int) { coreThread := len(nowPeers)/coreBaseUnit + 1 masterIp := make([]string, 0) timeOut, _ := strconv.Atoi(config.Server.TimeOut) var waitGroup sync.WaitGroup for i, val := range nowPeers { ip := val.(string) if (i+1)%coreThread == 0 { masterIp = append(masterIp, strings.Split(ip, ":")[0]) for i := 0; i < len(masterIp); i++ { go Restart(masterIp[i], timeOut) waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1 } waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞 masterIp = make([]string, 0) } else { masterIp = append(masterIp, strings.Split(ip, ":")[0]) if len(nowPeers) == i+1 { var waitGroup sync.WaitGroup for i := 0; i < len(masterIp); i++ { go Restart(masterIp[i], timeOut) waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1 } waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞 masterIp = make([]string, 0) break } } continue } } //获取查找内容当前行内容 func GetNowLineContent(filePath string, searchContent string) string { scriptStr := "cat" + filePath + "| grep " + searchContent return strings.Split(util.RunScript(scriptStr), "\n")[0] } //作为数据节点加入 func AsVolume() bool { nowPeers := GetNowPeersList() if nowPeers == nil || len(nowPeers) == 0 { return false } ReplaceLineContentBySearch(StartScriptAsVolume, Option, config.Server.ScriptPath, StartServerScript) ReplaceLineContentBySearch(GetNewPeers(), Peer, config.Server.ScriptPath, StartServerScript) StartServer(config.Server.ScriptPath) return true } //作为主节点加入(默认包含数据节点) func AsMaster() bool { AddNewMasterToPeers() nowPeers := GetNowPeersList() RequestMasterNodesOperation(nowPeers) return true } //获取当前集群列表 func GetNowPeersList() []interface{} { getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search" getJson := `{ "query": { "bool": { "filter": [ { "term": { "application":"nodeOperation" } } ] } }, "size": 1 }` buf, _ := util.EsReq("POST", getUrl, []byte(getJson)) source, _ := util.Sourcelist(buf) peers := source[0]["peers"].([]interface{}) return peers } //获取当前集群列表格式化信息 func GetNewPeers() string { peers := GetNowPeersList() p := Peer + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", ",", -1) return p } //向集群加入新的master func AddNewMasterToPeers() (result bool) { peer := config.Server.EsServerIp + ":6333" addUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_update_by_query" addJson := `{ "script": { "lang": "painless", "inline": "ctx._source.peers.add(params.newpeer)", "params": { "newpeer": "` + peer + `" } }, "query": { "bool": { "filter": [ { "term": { "application": "nodeOperation" } } ] } } }` buf, _ := util.EsReq("POST", addUrl, []byte(addJson)) updateRes, _ := util.SourceUpdated(buf) if updateRes == -1 { result = false } else { result = true } return result }