From 88956cd178f98ab35e2267e864a141b5420b7c35 Mon Sep 17 00:00:00 2001 From: sunty <1172534965@qq.com> Date: 星期四, 09 四月 2020 19:48:10 +0800 Subject: [PATCH] add master process --- controllers/swfsControllers.go | 102 ++++++++++++++++++++++++++++++++++++++++----------- 1 files changed, 80 insertions(+), 22 deletions(-) diff --git a/controllers/swfsControllers.go b/controllers/swfsControllers.go index 17d0b7b..c4c8c38 100644 --- a/controllers/swfsControllers.go +++ b/controllers/swfsControllers.go @@ -3,9 +3,12 @@ import ( "fmt" "github.com/gin-gonic/gin" + "net/http" + "strconv" "strings" - "test/config" - "test/util" + "swfs/config" + "swfs/util" + "sync" "time" ) @@ -26,47 +29,94 @@ var body SWFSInfo c.BindJSON(&body) role := body.Role + nowPeers := GetNowPeersList() if role == "master" { - AsMaster() + AsMaster(nowPeers) } else if role == "volume" { - AsVolume() + AsVolume(nowPeers) } else { - return } } -func (sc *SeaweedfsController) RestartMaster(c *gin.Context) { +func ReplaceLineContentBySearch(replaceContent string) { + resContent := GetNowLineContent("/opt/vasystem/script/seaweedfs_start.sh", "#start_master_server") + replaceStr := "sed -ie 's/" + resContent + "/" + replaceContent + "/g' /opt/vasystem/seaweedfs_start.sh" + util.RunScript(replaceStr) +} + +func (sc *SeaweedfsController) RestartMasterController(c *gin.Context) { end := "sh /opt/vasystem/script/seaweedfs_stop.sh" start := "sh /opt/vasystem/script/seaweedfs_start.sh" util.RunScript(end) util.RunScript(start) } -func RequestMasterNodesOperation() { - nowPeers := GetNowPeersList() - coreThread := len(nowPeers)/100 + 1 +func UpdateAllNodesScriptArgument(nowPeers []interface{}) { + for _, val := range nowPeers { + ip := val.(string) + url := "http://" + ip + ":7020/node/api-v/swfs/updateSWFSService" + http.Get(url) + } +} + +func RequestMasterNodesOperation(nowPeers []interface{}) { + //fmt.Println("config.Server.CoreBaseUnit", config.Server.CoreBaseUnit) + coreBaseUnit, _ := strconv.Atoi(config.Server.CoreBaseUnit) + //fmt.Println("nowPeers: ", nowPeers) + //fmt.Println("coreBaseUnit: ", coreBaseUnit) + UpdateAllNodesScriptArgument(nowPeers) + RestartAllMaster(nowPeers, coreBaseUnit) +} + +func RestartServer(ip string, timeOut int) { + url := "http://" + ip + ":7020/node/api-v/swfs/restartMaster" + http.Get(url) + tick := time.Tick(1 * time.Second) + for countdown := timeOut; countdown > 0; countdown-- { + verificationMasterUrl := "http://" + ip + ":6333" + verificationVolume1Url := "http://" + ip + ":6700" + verificationVolume2Url := "http://" + ip + ":6701" + _, masterErr := http.Get(verificationMasterUrl) + _, volume1Err := http.Get(verificationVolume1Url) + _, volume2Err := http.Get(verificationVolume2Url) + if masterErr == nil && volume1Err == nil && volume2Err == nil { + break + } + <-tick + } +} + +func RestartAllMaster(nowPeers []interface{}, coreBaseUnit int) { + coreThread := len(nowPeers)/coreBaseUnit + 1 masterIp := make([]string, 0) + timeOut, _ := strconv.Atoi(config.Server.TimeOut) + var waitGroup sync.WaitGroup for i, val := range nowPeers { ip := val.(string) if (i+1)%coreThread == 0 { masterIp = append(masterIp, strings.Split(ip, ":")[0]) - for _, val := range masterIp { - RestartOrtherMaster(val) - masterIp = append(masterIp[:0], masterIp[1:]...) + for i := 0; i < len(masterIp); i++ { + go RestartServer(masterIp[i], timeOut) + waitGroup.Add(1) //姣忓垱寤轰竴涓猤oroutine锛屽氨鎶婁换鍔¢槦鍒椾腑浠诲姟鐨勬暟閲�+1 } - time.Sleep(time.Second * 2) + waitGroup.Wait() //.Wait()杩欓噷浼氬彂鐢熼樆濉烇紝鐩村埌闃熷垪涓墍鏈夌殑浠诲姟缁撴潫灏变細瑙i櫎闃诲 + masterIp = make([]string, 0) } else { masterIp = append(masterIp, strings.Split(ip, ":")[0]) if len(nowPeers) == i+1 { - for _, val := range masterIp { - RestartOrtherMaster(val) + var waitGroup sync.WaitGroup + for i := 0; i < len(masterIp); i++ { + go RestartServer(masterIp[i], timeOut) + waitGroup.Add(1) //姣忓垱寤轰竴涓猤oroutine锛屽氨鎶婁换鍔¢槦鍒椾腑浠诲姟鐨勬暟閲�+1 } + waitGroup.Wait() //.Wait()杩欓噷浼氬彂鐢熼樆濉烇紝鐩村埌闃熷垪涓墍鏈夌殑浠诲姟缁撴潫灏变細瑙i櫎闃诲 + masterIp = make([]string, 0) break } - continue } + continue } } @@ -76,16 +126,24 @@ return peers } -func AsVolume() { - nowPeers := GetNowPeersList() +func GetNowLineContent(filePath string, searchContent string) string { + scriptStr := "cat" + filePath + "| grep " + searchContent + content := strings.Split(util.RunScript(scriptStr), "\n")[0] + return content +} + +func AsVolume(nowPeers []interface{}) { + ReplaceLineContentBySearch("start_master_server") fmt.Println(nowPeers) } -func AsMaster() { - nowPeers := GetNowPeersList() - coreThread := len(nowPeers) / 100 - fmt.Println(coreThread) +func AsMaster(nowPeers []interface{}) { + AddNewMasterToPeers() + RequestMasterNodesOperation(nowPeers) +} +func (sc *SeaweedfsController) RoleOfVolumeToMasterController(c *gin.Context) { + AsMaster(GetNowPeersList()) } func GetNowPeersList() []interface{} { -- Gitblit v1.8.0