package middleware
|
|
import (
|
"encoding/json"
|
"fmt"
|
"io/ioutil"
|
"net/http"
|
"strconv"
|
"strings"
|
"swfs/config"
|
"swfs/tools/es"
|
"swfs/tools/script"
|
"sync"
|
"time"
|
)
|
|
//作为主节点加入(默认包含数据节点)
|
func AsMaster(role string) bool {
|
es.AddNewMasterToPeers()
|
nowPeers := es.GetNowPeersList()
|
fmt.Println("nowPeers: ", nowPeers)
|
RequestNodesOperation(nowPeers, role)
|
return true
|
}
|
|
//作为数据节点加入
|
func AsVolume() bool {
|
nowPeers := es.GetNowPeersList()
|
if nowPeers == nil || len(nowPeers) == 0 {
|
return false
|
}
|
script.ReplaceLineContentBySearch(config.Option+config.StartScriptAsVolume, config.Option, config.Server.ScriptPath, config.StartServerScript)
|
script.ReplaceLineContentBySearch(es.GetNewPeers(), config.Peer, config.Server.ScriptPath, config.StartServerScript)
|
script.StartServer(config.Server.ScriptPath)
|
return true
|
}
|
|
//作为 主+数据 节点加入
|
func AsMaVo(role string) {
|
AsMaster(role)
|
script.ReplaceLineContentBySearch(config.Option+config.StartScriptAsMaVo, config.Option, config.Server.ScriptPath, config.StartServerScript)
|
}
|
|
//更新所有节点的脚本参数
|
func UpdateAllNodesScriptArgument(peersIp []string) {
|
fmt.Println("开始更新本地配置文件")
|
for _, ip := range peersIp {
|
fmt.Println("ip: ", ip)
|
url := "http://" + ip + ":7020/node/api-v/swfs/updateSWFSService"
|
fmt.Println("url", url)
|
resp, _ := http.Get(url)
|
fmt.Println("更新返回状态:", resp.StatusCode)
|
if resp.StatusCode == 200 {
|
fmt.Println("请求完毕", resp.StatusCode)
|
}
|
}
|
}
|
|
//请求作为当前角色节点操作流程
|
func RequestNodesOperation(nowPeers []interface{}, role string) {
|
peersIp := make([]string, 0)
|
for _, val := range nowPeers {
|
peersIp = append(peersIp, strings.Split(val.(string), ":")[0])
|
}
|
coreBaseUnit, _ := strconv.Atoi(config.Server.CoreBaseUnit)
|
UpdateAllNodesScriptArgument(peersIp)
|
replaceContent := ""
|
if role == "master" {
|
replaceContent = config.Option + config.StartScriptAsMaster
|
} else if role == "master+volume" {
|
replaceContent = config.Option + config.StartScriptAsMaVo
|
}
|
script.ReplaceLineContentBySearch(replaceContent, config.Option, config.Server.ScriptPath, config.StartServerScript)
|
RestartAllServer(peersIp, coreBaseUnit)
|
}
|
|
//重启所有节点服务并验证
|
func Restart(ip string, timeOut int) {
|
url := "http://" + ip + ":7020/node/api-v/swfs/restartServer"
|
var info interface{}
|
httpRes, _ := http.Get(url)
|
body, _ := ioutil.ReadAll(httpRes.Body)
|
json.Unmarshal(body, &info)
|
res, ok := info.(map[string]interface{})
|
if !ok {
|
fmt.Println("http response interface can not change map[string]interface{}")
|
}
|
fmt.Println("res: ", res)
|
startupItem := res["data"].(string)
|
if httpRes.StatusCode != 200 {
|
return
|
}
|
fmt.Println("Restart startupItem: ", startupItem)
|
tick := time.Tick(1 * time.Second)
|
fmt.Println("准备开始验证节点服务")
|
for countdown := timeOut; countdown > 0; countdown-- {
|
fmt.Println("第", countdown, "次验证")
|
result := Verification(startupItem, ip)
|
fmt.Println("第一次验证result结果:", result)
|
if result == true {
|
break
|
}
|
<-tick
|
}
|
fmt.Println("验证完毕")
|
}
|
|
//验证服务状态
|
func Verification(startupItem string, ip string) bool {
|
resStatu := false
|
fmt.Println("Verification startupItem: ", startupItem)
|
switch startupItem {
|
case config.StartScriptAsVolume:
|
verificationVolumeUrl := "http://" + ip + ":6700/ui/index.html"
|
_, volume1Err := http.Get(verificationVolumeUrl)
|
fmt.Println("volume1Err", volume1Err)
|
if volume1Err == nil {
|
resStatu = true
|
}
|
case config.StartScriptAsMaster:
|
verificationMasterUrl := "http://" + ip + ":6333"
|
fmt.Println("verificationMasterUrl: ", verificationMasterUrl)
|
_, masterErr := http.Get(verificationMasterUrl)
|
fmt.Println("masterErr", masterErr)
|
if masterErr == nil {
|
resStatu = true
|
}
|
case config.StartScriptAsMaVo:
|
verificationMasterUrl := "http://" + ip + ":6333"
|
verificationVolumeUrl := "http://" + ip + ":6700/ui/index.html"
|
_, masterErr := http.Get(verificationMasterUrl)
|
fmt.Println("masterErr", masterErr)
|
_, volume1Err := http.Get(verificationVolumeUrl)
|
fmt.Println("volume1Err", volume1Err)
|
if masterErr == nil && volume1Err == nil {
|
resStatu = true
|
}
|
}
|
return resStatu
|
}
|
|
//获取本地启动项
|
func GetLocalStartupItem(scriptPath string, scriptFile string) string {
|
startupItem := script.GetNowLineContent(scriptPath+"/"+scriptFile, config.Option)
|
fmt.Println("startupItem: ", startupItem)
|
return startupItem
|
}
|
|
//构建重启流程
|
func RestartAllServer(peersIp []string, coreBaseUnit int) {
|
fmt.Println("开始构建重启流程")
|
coreThread := len(peersIp)/coreBaseUnit + 1
|
masterIp := make([]string, 0)
|
timeOut, _ := strconv.Atoi(config.Server.TimeOut)
|
var waitGroup sync.WaitGroup
|
fmt.Println("当前并行度coreThread:", coreThread)
|
for i, ip := range peersIp {
|
fmt.Println("重启当前组服务" + ip)
|
if (i+1)%coreThread == 0 {
|
masterIp = append(masterIp, strings.Split(ip, ":")[0])
|
fmt.Println("加入第一组并开始验证第一组 masterIp: ", masterIp)
|
for i := 0; i < len(masterIp); i++ {
|
fmt.Println("len masterIp: ", len(masterIp))
|
fmt.Println("第" + strconv.Itoa(i) + "个线程")
|
fmt.Println("当前goroutinebe")
|
waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
|
fmt.Println("当前goroutineaf")
|
go Restart(masterIp[i], timeOut)
|
waitGroup.Done()
|
}
|
fmt.Println("这里为阻塞!!!!!")
|
waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
|
fmt.Println("当前组任务完成")
|
masterIp = make([]string, 0)
|
fmt.Println("清空当前组成员:", masterIp)
|
} else {
|
masterIp = append(masterIp, strings.Split(ip, ":")[0])
|
if len(peersIp) == i+1 {
|
var waitGroup sync.WaitGroup
|
for i := 0; i < len(masterIp); i++ {
|
waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
|
go Restart(masterIp[i], timeOut)
|
waitGroup.Done()
|
}
|
waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
|
masterIp = make([]string, 0)
|
break
|
}
|
}
|
|
}
|
fmt.Println("服务流程执行完毕")
|
|
}
|