package controllers import ( "fmt" "github.com/gin-gonic/gin" "net/http" "strconv" "strings" "swfs/config" "swfs/util" "sync" "time" ) type SeaweedfsController struct{} type SWFSInfo struct { Role string `json:"role"` } //修改 func (sc *SeaweedfsController) UpdateSWFSServiceController(c *gin.Context) { oldPeers := GetOldPeers() newPeers := GetNewPeers() UpdatePeers(oldPeers, newPeers) } func (sc *SeaweedfsController) AddSWFSNodeController(c *gin.Context) { var body SWFSInfo c.BindJSON(&body) role := body.Role nowPeers := GetNowPeersList() if role == "master" { AsMaster(nowPeers) } else if role == "volume" { AsVolume(nowPeers) } else { return } } func ReplaceLineContentBySearch(replaceContent string) { resContent := GetNowLineContent("/opt/vasystem/script/seaweedfs_start.sh", "#start_master_server") replaceStr := "sed -ie 's/" + resContent + "/" + replaceContent + "/g' /opt/vasystem/seaweedfs_start.sh" util.RunScript(replaceStr) } func (sc *SeaweedfsController) RestartMasterController(c *gin.Context) { end := "sh /opt/vasystem/script/seaweedfs_stop.sh" start := "sh /opt/vasystem/script/seaweedfs_start.sh" util.RunScript(end) util.RunScript(start) } func UpdateAllNodesScriptArgument(nowPeers []interface{}) { for _, val := range nowPeers { ip := val.(string) url := "http://" + ip + ":7020/node/api-v/swfs/updateSWFSService" http.Get(url) } } func RequestMasterNodesOperation(nowPeers []interface{}) { //fmt.Println("config.Server.CoreBaseUnit", config.Server.CoreBaseUnit) coreBaseUnit, _ := strconv.Atoi(config.Server.CoreBaseUnit) //fmt.Println("nowPeers: ", nowPeers) //fmt.Println("coreBaseUnit: ", coreBaseUnit) UpdateAllNodesScriptArgument(nowPeers) RestartAllMaster(nowPeers, coreBaseUnit) } func RestartServer(ip string, timeOut int) { url := "http://" + ip + ":7020/node/api-v/swfs/restartMaster" http.Get(url) tick := time.Tick(1 * time.Second) for countdown := timeOut; countdown > 0; countdown-- { verificationMasterUrl := "http://" + ip + ":6333" verificationVolume1Url := "http://" + ip + ":6700" verificationVolume2Url := "http://" + ip + ":6701" _, masterErr := http.Get(verificationMasterUrl) _, volume1Err := http.Get(verificationVolume1Url) _, volume2Err := http.Get(verificationVolume2Url) if masterErr == nil && volume1Err == nil && volume2Err == nil { break } <-tick } } func RestartAllMaster(nowPeers []interface{}, coreBaseUnit int) { coreThread := len(nowPeers)/coreBaseUnit + 1 masterIp := make([]string, 0) timeOut, _ := strconv.Atoi(config.Server.TimeOut) var waitGroup sync.WaitGroup for i, val := range nowPeers { ip := val.(string) if (i+1)%coreThread == 0 { masterIp = append(masterIp, strings.Split(ip, ":")[0]) for i := 0; i < len(masterIp); i++ { go RestartServer(masterIp[i], timeOut) waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1 } waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞 masterIp = make([]string, 0) } else { masterIp = append(masterIp, strings.Split(ip, ":")[0]) if len(nowPeers) == i+1 { var waitGroup sync.WaitGroup for i := 0; i < len(masterIp); i++ { go RestartServer(masterIp[i], timeOut) waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1 } waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞 masterIp = make([]string, 0) break } } continue } } func GetOldPeers() string { str := "cat /opt/vasystem/seaweedfs_start.sh | grep peers=" peers := strings.Split(util.RunScript(str), "\n")[0] return peers } func GetNowLineContent(filePath string, searchContent string) string { scriptStr := "cat" + filePath + "| grep " + searchContent content := strings.Split(util.RunScript(scriptStr), "\n")[0] return content } func AsVolume(nowPeers []interface{}) { ReplaceLineContentBySearch("start_master_server") fmt.Println(nowPeers) } func AsMaster(nowPeers []interface{}) { AddNewMasterToPeers() RequestMasterNodesOperation(nowPeers) } func (sc *SeaweedfsController) RoleOfVolumeToMasterController(c *gin.Context) { AsMaster(GetNowPeersList()) } func GetNowPeersList() []interface{} { getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search" getJson := `{ "query": { "bool": { "filter": [ { "term": { "application":"nodeOperation" } } ] } }, "size": 1 }` buf, _ := util.EsReq("POST", getUrl, []byte(getJson)) source, _ := util.Sourcelist(buf) //fmt.Println(source) peers := source[0]["peers"].([]interface{}) return peers } func GetNewPeers() string { getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search" getJson := `{ "query": { "bool": { "filter": [ { "term": { "application":"nodeOperation" } } ] } }, "size": 1 }` buf, _ := util.EsReq("POST", getUrl, []byte(getJson)) source, _ := util.Sourcelist(buf) //fmt.Println(source) peers := source[0]["peers"].([]interface{}) fmt.Println(peers) p := "peers=" + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", ",", -1) return p } func UpdatePeers(oldPeers string, newPeers string) { str := "sed -ie 's/" + oldPeers + "/" + newPeers + "/g' /opt/vasystem/seaweedfs_start.sh" util.RunScript(str) } func AddNewMasterToPeers() (result bool) { peer := config.Server.EsServerIp + ":6333" addUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_update_by_query" addJson := `{ "script": { "lang": "painless", "inline": "ctx._source.peers.add(params.newpeer)", "params": { "newpeer": "` + peer + `" } }, "query": { "bool": { "filter": [ { "term": { "application": "nodeOperation" } } ] } } }` buf, _ := util.EsReq("POST", addUrl, []byte(addJson)) updateRes, _ := util.SourceUpdated(buf) if updateRes == -1 { result = false } else { result = true } return result }