package controllers import ( "encoding/json" "fmt" "github.com/gin-gonic/gin" "io/ioutil" "net/http" "strconv" "strings" "swfs/code" "swfs/config" "swfs/util" "sync" "time" ) const ( StartServerScript = "seaweedfs_start.sh" StopServerScript = "seaweedfs_stop.sh" //作为 volume 启动 StartScriptAsVolume = "1" //作为 master 启动 StartScriptAsMaster = "2" //作为 master + volume 启动 StartScriptAsMaVo = "3" //启动项参数头 Option = "option=" //master列表项参数头 Peer = "peers=" //StartScriptAsFiler = "option=4" ) type SeaweedfsController struct{} type SWFSInfo struct { Role string `json:"role"` } func (sc *SeaweedfsController) UpdateSWFSServiceController(c *gin.Context) { ReplaceLineContentBySearch(GetNewPeers(), Peer, config.Server.ScriptPath, StartServerScript) util.ResponseFormat(c, code.Success, config.Server.EsServerIp+"更新成功") } // @Security ApiKeyAuth // @Summary 新节点加入 // @Description 新节点加入 // @Accept json // @Produce json // @Tags swfs // @Param obj body SWFSInfo true "加入角色参数" // @Success 200 {string} json "{"code":200, msg:"", success:true}" // @Failure 500 {string} json "{"code":500, msg:"", success:false}" // @Router /node/api-v/swfs/addSWFSNode [POST] func (sc *SeaweedfsController) AddSWFSNodeController(c *gin.Context) { var body SWFSInfo c.BindJSON(&body) role := body.Role if role == "master" { AsMaster(role) util.ResponseFormat(c, code.AddSuccess, "加入节点成功") return } else if role == "volume" { status := AsVolume() if status == true { util.ResponseFormat(c, code.AddSuccess, "加入节点成功") return } else { util.ResponseFormat(c, code.AddClusterInfoErr, "当前还没有主节点") return } } else if role == "master+volume" { AsMaVo(role) util.ResponseFormat(c, code.AddSuccess, "加入节点成功") } else { util.ResponseFormat(c, code.RequestParamError, "选择节点类型错误") return } } func AsMaVo(role string) { AsMaster(role) ReplaceLineContentBySearch(Option+StartScriptAsMaVo, Option, config.Server.ScriptPath, StartServerScript) } func (sc *SeaweedfsController) RoleOfVolumeToMasterController(c *gin.Context) { AsMaster("master") ReplaceLineContentBySearch(Option+StartScriptAsMaVo, Option, config.Server.ScriptPath, StartServerScript) } func (sc *SeaweedfsController) RestartServerController(c *gin.Context) { StopServer(config.Server.ScriptPath) StartServer(config.Server.ScriptPath) time.Sleep(time.Second * 1) //fmt.Println("GetLocalStartupItem: ", GetLocalStartupItem(config.Server.ScriptPath, StartServerScript)) result := strings.Split(GetLocalStartupItem(config.Server.ScriptPath, StartServerScript), "=")[1] fmt.Println("result: ", result) util.ResponseFormat(c, code.Success, result) } //启动服务 func StartServer(scriptPath string) { //fmt.Println("sh " + scriptPath + "/" + StartServerScript) util.RunScript("sh " + scriptPath + "/" + StartServerScript) } //停止服务 func StopServer(scriptPath string) { //fmt.Println("sh " + scriptPath + "/" + StopServerScript) util.RunScript("sh " + scriptPath + "/" + StopServerScript) } //根据搜索内容替换整行内容 func ReplaceLineContentBySearch(replaceContent string, searchContent string, scriptPath string, scriptFile string) { resSearchContent := GetNowLineContent(scriptPath+"/"+scriptFile, searchContent) replaceStr := "sed -ie 's/" + resSearchContent + "/" + replaceContent + "/g' " + scriptPath + "/" + scriptFile util.RunScript(replaceStr) } //更新所有节点的脚本参数 func UpdateAllNodesScriptArgument(peersIp []string) { fmt.Println("开始更新本地配置文件") for _, ip := range peersIp { fmt.Println("ip: ", ip) url := "http://" + ip + ":7020/node/api-v/swfs/updateSWFSService" fmt.Println("url", url) resp, _ := http.Get(url) fmt.Println("更新返回状态:", resp.StatusCode) if resp.StatusCode == 200 { fmt.Println("请求完毕", resp.StatusCode) } } } //请求作为当前角色节点操作流程 func RequestNodesOperation(nowPeers []interface{}, role string) { peersIp := make([]string, 0) for _, val := range nowPeers { peersIp = append(peersIp, strings.Split(val.(string), ":")[0]) } coreBaseUnit, _ := strconv.Atoi(config.Server.CoreBaseUnit) UpdateAllNodesScriptArgument(peersIp) replaceContent := "" if role == "master" { replaceContent = Option + StartScriptAsMaster } else if role == "master+volume" { replaceContent = Option + StartScriptAsMaVo } ReplaceLineContentBySearch(replaceContent, Option, config.Server.ScriptPath, StartServerScript) RestartAllServer(peersIp, coreBaseUnit) } //重启所有节点服务并验证 func Restart(ip string, timeOut int) { url := "http://" + ip + ":7020/node/api-v/swfs/restartServer" var info interface{} httpRes, _ := http.Get(url) body, _ := ioutil.ReadAll(httpRes.Body) json.Unmarshal(body, &info) res, ok := info.(map[string]interface{}) if !ok { fmt.Println("http response interface can not change map[string]interface{}") } fmt.Println("res: ", res) startupItem := res["data"].(string) if httpRes.StatusCode == 200 { startupItem = string(body) } tick := time.Tick(1 * time.Second) fmt.Println("准备开始验证节点服务") for countdown := timeOut; countdown > 0; countdown-- { fmt.Println("第", countdown, "次验证") result := Verification(startupItem, ip) fmt.Println("第一次验证result结果:", result) if result == true { break } <-tick } fmt.Println("验证完毕") } //验证服务状态 func Verification(startupItem string, ip string) bool { resStatu := false switch startupItem { case StartScriptAsVolume: verificationVolumeUrl := "http://" + ip + ":6700/ui/index.html" _, volume1Err := http.Get(verificationVolumeUrl) if volume1Err == nil { resStatu = true } case StartScriptAsMaster: verificationMasterUrl := "http://" + ip + ":6333" _, masterErr := http.Get(verificationMasterUrl) if masterErr == nil { resStatu = true } case StartScriptAsMaVo: verificationMasterUrl := "http://" + ip + ":6333" verificationVolumeUrl := "http://" + ip + ":6700/ui/index.html" _, masterErr := http.Get(verificationMasterUrl) _, volume1Err := http.Get(verificationVolumeUrl) if masterErr == nil && volume1Err == nil { resStatu = true } } return resStatu } //获取本地启动项 func GetLocalStartupItem(scriptPath string, scriptFile string) string { startupItem := GetNowLineContent(scriptPath+"/"+scriptFile, Option) fmt.Println("startupItem: ", startupItem) return startupItem } //构建重启流程 func RestartAllServer(peersIp []string, coreBaseUnit int) { fmt.Println("开始构建重启流程") coreThread := len(peersIp)/coreBaseUnit + 1 masterIp := make([]string, 0) timeOut, _ := strconv.Atoi(config.Server.TimeOut) var waitGroup sync.WaitGroup fmt.Println("当前并行度coreThread:", coreThread) for i, ip := range peersIp { fmt.Println("重启当前组服务" + ip) if (i+1)%coreThread == 0 { masterIp = append(masterIp, strings.Split(ip, ":")[0]) fmt.Println("加入第一组并开始验证第一组 masterIp: ", masterIp) for i := 0; i < len(masterIp); i++ { fmt.Println("len masterIp: ", len(masterIp)) fmt.Println("第" + strconv.Itoa(i) + "个线程") fmt.Println("当前goroutinebe") waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1 fmt.Println("当前goroutineaf") go Restart(masterIp[i], timeOut) waitGroup.Done() } fmt.Println("这里为阻塞!!!!!") waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞 fmt.Println("当前组任务完成") masterIp = make([]string, 0) fmt.Println("清空当前组成员:", masterIp) } else { masterIp = append(masterIp, strings.Split(ip, ":")[0]) if len(peersIp) == i+1 { var waitGroup sync.WaitGroup for i := 0; i < len(masterIp); i++ { waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1 go Restart(masterIp[i], timeOut) waitGroup.Done() } waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞 masterIp = make([]string, 0) break } } } fmt.Println("服务流程执行完毕") } //获取查找内容当前行内容 func GetNowLineContent(filePath string, searchContent string) string { scriptStr := "cat " + filePath + "| grep " + searchContent fmt.Println("scriptStr: ", scriptStr) return strings.Split(util.RunScript(scriptStr), "\n")[0] } //作为数据节点加入 func AsVolume() bool { nowPeers := GetNowPeersList() if nowPeers == nil || len(nowPeers) == 0 { return false } ReplaceLineContentBySearch(Option+StartScriptAsVolume, Option, config.Server.ScriptPath, StartServerScript) ReplaceLineContentBySearch(GetNewPeers(), Peer, config.Server.ScriptPath, StartServerScript) StartServer(config.Server.ScriptPath) return true } //作为主节点加入(默认包含数据节点) func AsMaster(role string) bool { AddNewMasterToPeers() nowPeers := GetNowPeersList() fmt.Println("nowPeers: ", nowPeers) RequestNodesOperation(nowPeers, role) return true } //获取当前集群列表 func GetNowPeersList() []interface{} { getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search" getJson := `{ "query": { "bool": { "filter": [ { "term": { "application":"nodeOperation" } } ] } }, "size": 1 }` buf, _ := util.EsReq("POST", getUrl, []byte(getJson)) source, _ := util.Sourcelist(buf) peers := source[0]["peers"].([]interface{}) return peers } //获取当前集群列表格式化信息 func GetNewPeers() string { peers := GetNowPeersList() p := Peer + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", ",", -1) return p } //向集群加入新的master func AddNewMasterToPeers() (result bool) { peer := config.Server.EsServerIp + ":6333" addUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_update_by_query?refresh=true" addJson := `{ "script": { "lang": "painless", "inline": "ctx._source.peers.add(params.newpeer)", "params": { "newpeer": "` + peer + `" } }, "query": { "bool": { "filter": [ { "term": { "application": "nodeOperation" } } ] } } }` buf, _ := util.EsReq("POST", addUrl, []byte(addJson)) updateRes, _ := util.SourceUpdated(buf) if updateRes == -1 { result = false } else { result = true } return result }