sunty
2020-04-09 88956cd178f98ab35e2267e864a141b5420b7c35
add master process
3个文件已修改
118 ■■■■ 已修改文件
config/config.go 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
controllers/swfsControllers.go 102 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
router/router.go 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config/config.go
@@ -8,6 +8,8 @@
type server struct {
    EsServerIp   string `mapstructure: "esServerIp"`
    EsServerPort string `mapstructure: "esServerPort"`
    CoreBaseUnit string `mapstructure: "coreBaseUnit"`
    TimeOut      string `mapstructure: "timeOut"`
}
type elastic struct {
controllers/swfsControllers.go
@@ -3,9 +3,12 @@
import (
    "fmt"
    "github.com/gin-gonic/gin"
    "net/http"
    "strconv"
    "strings"
    "test/config"
    "test/util"
    "swfs/config"
    "swfs/util"
    "sync"
    "time"
)
@@ -26,47 +29,94 @@
    var body SWFSInfo
    c.BindJSON(&body)
    role := body.Role
    nowPeers := GetNowPeersList()
    if role == "master" {
        AsMaster()
        AsMaster(nowPeers)
    } else if role == "volume" {
        AsVolume()
        AsVolume(nowPeers)
    } else {
        return
    }
}
func (sc *SeaweedfsController) RestartMaster(c *gin.Context) {
func ReplaceLineContentBySearch(replaceContent string) {
    resContent := GetNowLineContent("/opt/vasystem/script/seaweedfs_start.sh", "#start_master_server")
    replaceStr := "sed -ie 's/" + resContent + "/" + replaceContent + "/g' /opt/vasystem/seaweedfs_start.sh"
    util.RunScript(replaceStr)
}
func (sc *SeaweedfsController) RestartMasterController(c *gin.Context) {
    end := "sh /opt/vasystem/script/seaweedfs_stop.sh"
    start := "sh /opt/vasystem/script/seaweedfs_start.sh"
    util.RunScript(end)
    util.RunScript(start)
}
func RequestMasterNodesOperation() {
    nowPeers := GetNowPeersList()
    coreThread := len(nowPeers)/100 + 1
func UpdateAllNodesScriptArgument(nowPeers []interface{}) {
    for _, val := range nowPeers {
        ip := val.(string)
        url := "http://" + ip + ":7020/node/api-v/swfs/updateSWFSService"
        http.Get(url)
    }
}
func RequestMasterNodesOperation(nowPeers []interface{}) {
    //fmt.Println("config.Server.CoreBaseUnit", config.Server.CoreBaseUnit)
    coreBaseUnit, _ := strconv.Atoi(config.Server.CoreBaseUnit)
    //fmt.Println("nowPeers: ", nowPeers)
    //fmt.Println("coreBaseUnit: ", coreBaseUnit)
    UpdateAllNodesScriptArgument(nowPeers)
    RestartAllMaster(nowPeers, coreBaseUnit)
}
func RestartServer(ip string, timeOut int) {
    url := "http://" + ip + ":7020/node/api-v/swfs/restartMaster"
    http.Get(url)
    tick := time.Tick(1 * time.Second)
    for countdown := timeOut; countdown > 0; countdown-- {
        verificationMasterUrl := "http://" + ip + ":6333"
        verificationVolume1Url := "http://" + ip + ":6700"
        verificationVolume2Url := "http://" + ip + ":6701"
        _, masterErr := http.Get(verificationMasterUrl)
        _, volume1Err := http.Get(verificationVolume1Url)
        _, volume2Err := http.Get(verificationVolume2Url)
        if masterErr == nil && volume1Err == nil && volume2Err == nil {
            break
        }
        <-tick
    }
}
func RestartAllMaster(nowPeers []interface{}, coreBaseUnit int) {
    coreThread := len(nowPeers)/coreBaseUnit + 1
    masterIp := make([]string, 0)
    timeOut, _ := strconv.Atoi(config.Server.TimeOut)
    var waitGroup sync.WaitGroup
    for i, val := range nowPeers {
        ip := val.(string)
        if (i+1)%coreThread == 0 {
            masterIp = append(masterIp, strings.Split(ip, ":")[0])
            for _, val := range masterIp {
                RestartOrtherMaster(val)
                masterIp = append(masterIp[:0], masterIp[1:]...)
            for i := 0; i < len(masterIp); i++ {
                go RestartServer(masterIp[i], timeOut)
                waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
            }
            time.Sleep(time.Second * 2)
            waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
            masterIp = make([]string, 0)
        } else {
            masterIp = append(masterIp, strings.Split(ip, ":")[0])
            if len(nowPeers) == i+1 {
                for _, val := range masterIp {
                    RestartOrtherMaster(val)
                var waitGroup sync.WaitGroup
                for i := 0; i < len(masterIp); i++ {
                    go RestartServer(masterIp[i], timeOut)
                    waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
                }
                waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
                masterIp = make([]string, 0)
                break
            }
            continue
        }
        continue
    }
}
@@ -76,16 +126,24 @@
    return peers
}
func AsVolume() {
    nowPeers := GetNowPeersList()
func GetNowLineContent(filePath string, searchContent string) string {
    scriptStr := "cat" + filePath + "| grep " + searchContent
    content := strings.Split(util.RunScript(scriptStr), "\n")[0]
    return content
}
func AsVolume(nowPeers []interface{}) {
    ReplaceLineContentBySearch("start_master_server")
    fmt.Println(nowPeers)
}
func AsMaster() {
    nowPeers := GetNowPeersList()
    coreThread := len(nowPeers) / 100
    fmt.Println(coreThread)
func AsMaster(nowPeers []interface{}) {
    AddNewMasterToPeers()
    RequestMasterNodesOperation(nowPeers)
}
func (sc *SeaweedfsController) RoleOfVolumeToMasterController(c *gin.Context) {
    AsMaster(GetNowPeersList())
}
func GetNowPeersList() []interface{} {
router/router.go
@@ -2,18 +2,20 @@
import (
    "github.com/gin-gonic/gin"
    "test/controllers"
    "swfs/controllers"
)
func NewRouter() *gin.Context {
func NewRouter() *gin.Engine {
    r := gin.Default()
    swfsController := new(controllers.SeaweedfsController)
    urlPrefix := "/node/api-v"
    swfsApi := r.Group(urlPrefix + "swfs")
    swfsApi := r.Group(urlPrefix + "/swfs")
    {
        swfsApi.POST("/addNode", swfsController.AddSWFSNodeController)
        swfsApi.POST("/updateSWFSService", swfsController.UpdateSWFSServiceController)
        swfsApi.GET("/", swfsController.RestartMaster)
        swfsApi.GET("/addNode", swfsController.AddSWFSNodeController)
        swfsApi.GET("/updateSWFSService", swfsController.UpdateSWFSServiceController)
        swfsApi.GET("/restartMaster", swfsController.RestartMasterController)
        swfsApi.GET("roleOfVolumeToMaster", swfsController.RoleOfVolumeToMasterController)
    }
    return r
}