package middleware import ( "encoding/json" "fmt" "io/ioutil" "net/http" "strconv" "strings" "swfs/config" "swfs/tools/es" "swfs/tools/script" "sync" "time" ) //作为主节点加入(默认包含数据节点) func AsMaster(role string) bool { es.AddNewMasterToPeers() nowPeers := es.GetNowPeersList() fmt.Println("nowPeers: ", nowPeers) RequestNodesOperation(nowPeers, role) return true } //作为数据节点加入 func AsVolume() bool { nowPeers := es.GetNowPeersList() if nowPeers == nil || len(nowPeers) == 0 { return false } script.ReplaceLineContentBySearch(config.Option+config.StartScriptAsVolume, config.Option, config.Server.ScriptPath, config.StartServerScript) script.ReplaceLineContentBySearch(es.GetNewPeers(), config.Peer, config.Server.ScriptPath, config.StartServerScript) script.StartServer(config.Server.ScriptPath) return true } //作为 主+数据 节点加入 func AsMaVo(role string) { AsMaster(role) script.ReplaceLineContentBySearch(config.Option+config.StartScriptAsMaVo, config.Option, config.Server.ScriptPath, config.StartServerScript) } //更新所有节点的脚本参数 func UpdateAllNodesScriptArgument(peersIp []string) { fmt.Println("开始更新本地配置文件") for _, ip := range peersIp { fmt.Println("ip: ", ip) url := "http://" + ip + ":7020/node/api-v/swfs/updateSWFSService" fmt.Println("url", url) resp, _ := http.Get(url) fmt.Println("更新返回状态:", resp.StatusCode) if resp.StatusCode == 200 { fmt.Println("请求完毕", resp.StatusCode) } } } //请求作为当前角色节点操作流程 func RequestNodesOperation(nowPeers []interface{}, role string) { peersIp := make([]string, 0) for _, val := range nowPeers { peersIp = append(peersIp, strings.Split(val.(string), ":")[0]) } coreBaseUnit, _ := strconv.Atoi(config.Server.CoreBaseUnit) UpdateAllNodesScriptArgument(peersIp) replaceContent := "" if role == "master" { replaceContent = config.Option + config.StartScriptAsMaster } else if role == "master+volume" { replaceContent = config.Option + config.StartScriptAsMaVo } script.ReplaceLineContentBySearch(replaceContent, config.Option, config.Server.ScriptPath, config.StartServerScript) RestartAllServer(peersIp, coreBaseUnit) } //重启所有节点服务并验证 func Restart(ip string, timeOut int) { url := "http://" + ip + ":7020/node/api-v/swfs/restartServer" var info interface{} httpRes, _ := http.Get(url) body, _ := ioutil.ReadAll(httpRes.Body) json.Unmarshal(body, &info) res, ok := info.(map[string]interface{}) if !ok { fmt.Println("http response interface can not change map[string]interface{}") } fmt.Println("res: ", res) startupItem := res["data"].(string) if httpRes.StatusCode != 200 { return } fmt.Println("Restart startupItem: ", startupItem) tick := time.Tick(1 * time.Second) fmt.Println("准备开始验证节点服务") for countdown := timeOut; countdown > 0; countdown-- { fmt.Println("第", countdown, "次验证") result := Verification(startupItem, ip) fmt.Println("第一次验证result结果:", result) if result == true { break } <-tick } fmt.Println("验证完毕") } //验证服务状态 func Verification(startupItem string, ip string) bool { resStatu := false fmt.Println("Verification startupItem: ", startupItem) switch startupItem { case config.StartScriptAsVolume: verificationVolumeUrl := "http://" + ip + ":6700/ui/index.html" _, volume1Err := http.Get(verificationVolumeUrl) if volume1Err == nil { resStatu = true } case config.StartScriptAsMaster: verificationMasterUrl := "http://" + ip + ":6333" _, masterErr := http.Get(verificationMasterUrl) if masterErr == nil { resStatu = true } case config.StartScriptAsMaVo: verificationMasterUrl := "http://" + ip + ":6333" verificationVolumeUrl := "http://" + ip + ":6700/ui/index.html" _, masterErr := http.Get(verificationMasterUrl) fmt.Println("masterErr", masterErr) _, volume1Err := http.Get(verificationVolumeUrl) fmt.Println("volume1Err", volume1Err) if masterErr == nil && volume1Err == nil { resStatu = true } } return resStatu } //获取本地启动项 func GetLocalStartupItem(scriptPath string, scriptFile string) string { startupItem := script.GetNowLineContent(scriptPath+"/"+scriptFile, config.Option) fmt.Println("startupItem: ", startupItem) return startupItem } //构建重启流程 func RestartAllServer(peersIp []string, coreBaseUnit int) { fmt.Println("开始构建重启流程") coreThread := len(peersIp)/coreBaseUnit + 1 masterIp := make([]string, 0) timeOut, _ := strconv.Atoi(config.Server.TimeOut) var waitGroup sync.WaitGroup fmt.Println("当前并行度coreThread:", coreThread) for i, ip := range peersIp { fmt.Println("重启当前组服务" + ip) if (i+1)%coreThread == 0 { masterIp = append(masterIp, strings.Split(ip, ":")[0]) fmt.Println("加入第一组并开始验证第一组 masterIp: ", masterIp) for i := 0; i < len(masterIp); i++ { fmt.Println("len masterIp: ", len(masterIp)) fmt.Println("第" + strconv.Itoa(i) + "个线程") fmt.Println("当前goroutinebe") waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1 fmt.Println("当前goroutineaf") go Restart(masterIp[i], timeOut) waitGroup.Done() } fmt.Println("这里为阻塞!!!!!") waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞 fmt.Println("当前组任务完成") masterIp = make([]string, 0) fmt.Println("清空当前组成员:", masterIp) } else { masterIp = append(masterIp, strings.Split(ip, ":")[0]) if len(peersIp) == i+1 { var waitGroup sync.WaitGroup for i := 0; i < len(masterIp); i++ { waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1 go Restart(masterIp[i], timeOut) waitGroup.Done() } waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞 masterIp = make([]string, 0) break } } } fmt.Println("服务流程执行完毕") }