sunty
2020-04-16 d2d968c2edf5518dda717f1602dd6204e0256e35
Organize the structure
3个文件已添加
2个文件已修改
622 ■■■■ 已修改文件
config/config.go 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
controllers/swfsControllers.go 308 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tools/es/es.go 73 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tools/middleware/middleware.go 191 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tools/script/script.go 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config/config.go
@@ -5,6 +5,22 @@
    "log"
)
const (
    StartServerScript = "seaweedfs_start.sh"
    StopServerScript  = "seaweedfs_stop.sh"
    //作为 volume 启动
    StartScriptAsVolume = "1"
    //作为 master 启动
    StartScriptAsMaster = "2"
    //作为 master + volume 启动
    StartScriptAsMaVo = "3"
    //启动项参数头
    Option = "option="
    //master列表项参数头
    Peer = "peers="
    //StartScriptAsFiler    = "option=4"
)
type server struct {
    EsServerIp   string `mapstructure: "esServerIp"`
    EsServerPort string `mapstructure: "esServerPort"`
controllers/swfsControllers.go
@@ -1,34 +1,16 @@
package controllers
import (
    "encoding/json"
    "fmt"
    "github.com/gin-gonic/gin"
    "io/ioutil"
    "net/http"
    "strconv"
    "strings"
    "swfs/code"
    "swfs/config"
    "swfs/tools/es"
    "swfs/tools/middleware"
    "swfs/tools/script"
    "swfs/util"
    "sync"
    "time"
)
const (
    StartServerScript = "seaweedfs_start.sh"
    StopServerScript  = "seaweedfs_stop.sh"
    //作为 volume 启动
    StartScriptAsVolume = "1"
    //作为 master 启动
    StartScriptAsMaster = "2"
    //作为 master + volume 启动
    StartScriptAsMaVo = "3"
    //启动项参数头
    Option = "option="
    //master列表项参数头
    Peer = "peers="
    //StartScriptAsFiler    = "option=4"
)
type SeaweedfsController struct{}
@@ -38,7 +20,7 @@
}
func (sc *SeaweedfsController) UpdateSWFSServiceController(c *gin.Context) {
    ReplaceLineContentBySearch(GetNewPeers(), Peer, config.Server.ScriptPath, StartServerScript)
    script.ReplaceLineContentBySearch(es.GetNewPeers(), config.Peer, config.Server.ScriptPath, config.StartServerScript)
    util.ResponseFormat(c, code.Success, config.Server.EsServerIp+"更新成功")
}
@@ -57,11 +39,11 @@
    c.BindJSON(&body)
    role := body.Role
    if role == "master" {
        AsMaster(role)
        middleware.AsMaster(role)
        util.ResponseFormat(c, code.AddSuccess, "加入节点成功")
        return
    } else if role == "volume" {
        status := AsVolume()
        status := middleware.AsVolume()
        if status == true {
            util.ResponseFormat(c, code.AddSuccess, "加入节点成功")
            return
@@ -70,7 +52,7 @@
            return
        }
    } else if role == "master+volume" {
        AsMaVo(role)
        middleware.AsMaVo(role)
        util.ResponseFormat(c, code.AddSuccess, "加入节点成功")
    } else {
        util.ResponseFormat(c, code.RequestParamError, "选择节点类型错误")
@@ -79,283 +61,17 @@
}
func AsMaVo(role string) {
    AsMaster(role)
    ReplaceLineContentBySearch(Option+StartScriptAsMaVo, Option, config.Server.ScriptPath, StartServerScript)
}
func (sc *SeaweedfsController) RoleOfVolumeToMasterController(c *gin.Context) {
    AsMaster("master")
    ReplaceLineContentBySearch(Option+StartScriptAsMaVo, Option, config.Server.ScriptPath, StartServerScript)
    middleware.AsMaster("master")
    script.ReplaceLineContentBySearch(config.Option+config.StartScriptAsMaVo, config.Option, config.Server.ScriptPath, config.StartServerScript)
}
func (sc *SeaweedfsController) RestartServerController(c *gin.Context) {
    StopServer(config.Server.ScriptPath)
    StartServer(config.Server.ScriptPath)
    script.StopServer(config.Server.ScriptPath)
    script.StartServer(config.Server.ScriptPath)
    time.Sleep(time.Second * 1)
    //fmt.Println("GetLocalStartupItem: ", GetLocalStartupItem(config.Server.ScriptPath, StartServerScript))
    result := strings.Split(GetLocalStartupItem(config.Server.ScriptPath, StartServerScript), "=")[1]
    result := strings.Split(middleware.GetLocalStartupItem(config.Server.ScriptPath, config.StartServerScript), "=")[1]
    fmt.Println("result: ", result)
    util.ResponseFormat(c, code.Success, result)
}
//启动服务
func StartServer(scriptPath string) {
    //fmt.Println("sh " + scriptPath + "/" + StartServerScript)
    util.RunScript("sh " + scriptPath + "/" + StartServerScript)
}
//停止服务
func StopServer(scriptPath string) {
    //fmt.Println("sh " + scriptPath + "/" + StopServerScript)
    util.RunScript("sh " + scriptPath + "/" + StopServerScript)
}
//根据搜索内容替换整行内容
func ReplaceLineContentBySearch(replaceContent string, searchContent string, scriptPath string, scriptFile string) {
    resSearchContent := GetNowLineContent(scriptPath+"/"+scriptFile, searchContent)
    replaceStr := "sed -ie 's/" + resSearchContent + "/" + replaceContent + "/g' " + scriptPath + "/" + scriptFile
    util.RunScript(replaceStr)
}
//更新所有节点的脚本参数
func UpdateAllNodesScriptArgument(peersIp []string) {
    fmt.Println("开始更新本地配置文件")
    for _, ip := range peersIp {
        fmt.Println("ip: ", ip)
        url := "http://" + ip + ":7020/node/api-v/swfs/updateSWFSService"
        fmt.Println("url", url)
        resp, _ := http.Get(url)
        fmt.Println("更新返回状态:", resp.StatusCode)
        if resp.StatusCode == 200 {
            fmt.Println("请求完毕", resp.StatusCode)
        }
    }
}
//请求作为当前角色节点操作流程
func RequestNodesOperation(nowPeers []interface{}, role string) {
    peersIp := make([]string, 0)
    for _, val := range nowPeers {
        peersIp = append(peersIp, strings.Split(val.(string), ":")[0])
    }
    coreBaseUnit, _ := strconv.Atoi(config.Server.CoreBaseUnit)
    UpdateAllNodesScriptArgument(peersIp)
    replaceContent := ""
    if role == "master" {
        replaceContent = Option + StartScriptAsMaster
    } else if role == "master+volume" {
        replaceContent = Option + StartScriptAsMaVo
    }
    ReplaceLineContentBySearch(replaceContent, Option, config.Server.ScriptPath, StartServerScript)
    RestartAllServer(peersIp, coreBaseUnit)
}
//重启所有节点服务并验证
func Restart(ip string, timeOut int) {
    url := "http://" + ip + ":7020/node/api-v/swfs/restartServer"
    var info interface{}
    httpRes, _ := http.Get(url)
    body, _ := ioutil.ReadAll(httpRes.Body)
    json.Unmarshal(body, &info)
    res, ok := info.(map[string]interface{})
    if !ok {
        fmt.Println("http response interface can not change map[string]interface{}")
    }
    fmt.Println("res: ", res)
    startupItem := res["data"].(string)
    if httpRes.StatusCode != 200 {
        return
    }
    fmt.Println("Restart startupItem: ", startupItem)
    tick := time.Tick(1 * time.Second)
    fmt.Println("准备开始验证节点服务")
    for countdown := timeOut; countdown > 0; countdown-- {
        fmt.Println("第", countdown, "次验证")
        result := Verification(startupItem, ip)
        fmt.Println("第一次验证result结果:", result)
        if result == true {
            break
        }
        <-tick
    }
    fmt.Println("验证完毕")
}
//验证服务状态
func Verification(startupItem string, ip string) bool {
    resStatu := false
    fmt.Println("Verification startupItem: ", startupItem)
    switch startupItem {
    case StartScriptAsVolume:
        verificationVolumeUrl := "http://" + ip + ":6700/ui/index.html"
        _, volume1Err := http.Get(verificationVolumeUrl)
        if volume1Err == nil {
            resStatu = true
        }
    case StartScriptAsMaster:
        verificationMasterUrl := "http://" + ip + ":6333"
        _, masterErr := http.Get(verificationMasterUrl)
        if masterErr == nil {
            resStatu = true
        }
    case StartScriptAsMaVo:
        verificationMasterUrl := "http://" + ip + ":6333"
        verificationVolumeUrl := "http://" + ip + ":6700/ui/index.html"
        _, masterErr := http.Get(verificationMasterUrl)
        fmt.Println("masterErr", masterErr)
        _, volume1Err := http.Get(verificationVolumeUrl)
        fmt.Println("volume1Err", volume1Err)
        if masterErr == nil && volume1Err == nil {
            resStatu = true
        }
    }
    return resStatu
}
//获取本地启动项
func GetLocalStartupItem(scriptPath string, scriptFile string) string {
    startupItem := GetNowLineContent(scriptPath+"/"+scriptFile, Option)
    fmt.Println("startupItem: ", startupItem)
    return startupItem
}
//构建重启流程
func RestartAllServer(peersIp []string, coreBaseUnit int) {
    fmt.Println("开始构建重启流程")
    coreThread := len(peersIp)/coreBaseUnit + 1
    masterIp := make([]string, 0)
    timeOut, _ := strconv.Atoi(config.Server.TimeOut)
    var waitGroup sync.WaitGroup
    fmt.Println("当前并行度coreThread:", coreThread)
    for i, ip := range peersIp {
        fmt.Println("重启当前组服务" + ip)
        if (i+1)%coreThread == 0 {
            masterIp = append(masterIp, strings.Split(ip, ":")[0])
            fmt.Println("加入第一组并开始验证第一组 masterIp: ", masterIp)
            for i := 0; i < len(masterIp); i++ {
                fmt.Println("len masterIp: ", len(masterIp))
                fmt.Println("第" + strconv.Itoa(i) + "个线程")
                fmt.Println("当前goroutinebe")
                waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
                fmt.Println("当前goroutineaf")
                go Restart(masterIp[i], timeOut)
                waitGroup.Done()
            }
            fmt.Println("这里为阻塞!!!!!")
            waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
            fmt.Println("当前组任务完成")
            masterIp = make([]string, 0)
            fmt.Println("清空当前组成员:", masterIp)
        } else {
            masterIp = append(masterIp, strings.Split(ip, ":")[0])
            if len(peersIp) == i+1 {
                var waitGroup sync.WaitGroup
                for i := 0; i < len(masterIp); i++ {
                    waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
                    go Restart(masterIp[i], timeOut)
                    waitGroup.Done()
                }
                waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
                masterIp = make([]string, 0)
                break
            }
        }
    }
    fmt.Println("服务流程执行完毕")
}
//获取查找内容当前行内容
func GetNowLineContent(filePath string, searchContent string) string {
    scriptStr := "cat " + filePath + "| grep " + searchContent
    fmt.Println("scriptStr: ", scriptStr)
    return strings.Split(util.RunScript(scriptStr), "\n")[0]
}
//作为数据节点加入
func AsVolume() bool {
    nowPeers := GetNowPeersList()
    if nowPeers == nil || len(nowPeers) == 0 {
        return false
    }
    ReplaceLineContentBySearch(Option+StartScriptAsVolume, Option, config.Server.ScriptPath, StartServerScript)
    ReplaceLineContentBySearch(GetNewPeers(), Peer, config.Server.ScriptPath, StartServerScript)
    StartServer(config.Server.ScriptPath)
    return true
}
//作为主节点加入(默认包含数据节点)
func AsMaster(role string) bool {
    AddNewMasterToPeers()
    nowPeers := GetNowPeersList()
    fmt.Println("nowPeers: ", nowPeers)
    RequestNodesOperation(nowPeers, role)
    return true
}
//获取当前集群列表
func GetNowPeersList() []interface{} {
    getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search"
    getJson := `{
    "query": {
        "bool": {
            "filter": [
                {
                    "term": {
                        "application":"nodeOperation"
                    }
                }
            ]
        }
    },
    "size": 1
}`
    buf, _ := util.EsReq("POST", getUrl, []byte(getJson))
    source, _ := util.Sourcelist(buf)
    peers := source[0]["peers"].([]interface{})
    return peers
}
//获取当前集群列表格式化信息
func GetNewPeers() string {
    peers := GetNowPeersList()
    p := Peer + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", ",", -1)
    return p
}
//向集群加入新的master
func AddNewMasterToPeers() (result bool) {
    peer := config.Server.EsServerIp + ":6333"
    addUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_update_by_query?refresh=true"
    addJson := `{
    "script": {
        "lang": "painless",
        "inline": "ctx._source.peers.add(params.newpeer)",
        "params": {
            "newpeer": "` + peer + `"
        }
    },
    "query": {
        "bool": {
            "filter": [
                {
                    "term": {
                        "application": "nodeOperation"
                    }
                }
            ]
        }
    }
}`
    buf, _ := util.EsReq("POST", addUrl, []byte(addJson))
    updateRes, _ := util.SourceUpdated(buf)
    if updateRes == -1 {
        result = false
    } else {
        result = true
    }
    return result
}
tools/es/es.go
New file
@@ -0,0 +1,73 @@
package es
import (
    "fmt"
    "strings"
    "swfs/config"
    "swfs/util"
)
//向集群加入新的master
func AddNewMasterToPeers() (result bool) {
    peer := config.Server.EsServerIp + ":6333"
    addUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_update_by_query?refresh=true"
    addJson := `{
    "script": {
        "lang": "painless",
        "inline": "ctx._source.peers.add(params.newpeer)",
        "params": {
            "newpeer": "` + peer + `"
        }
    },
    "query": {
        "bool": {
            "filter": [
                {
                    "term": {
                        "application": "nodeOperation"
                    }
                }
            ]
        }
    }
}`
    buf, _ := util.EsReq("POST", addUrl, []byte(addJson))
    updateRes, _ := util.SourceUpdated(buf)
    if updateRes == -1 {
        result = false
    } else {
        result = true
    }
    return result
}
//获取当前集群列表
func GetNowPeersList() []interface{} {
    getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search"
    getJson := `{
    "query": {
        "bool": {
            "filter": [
                {
                    "term": {
                        "application":"nodeOperation"
                    }
                }
            ]
        }
    },
    "size": 1
}`
    buf, _ := util.EsReq("POST", getUrl, []byte(getJson))
    source, _ := util.Sourcelist(buf)
    peers := source[0]["peers"].([]interface{})
    return peers
}
//获取当前集群列表格式化信息
func GetNewPeers() string {
    peers := GetNowPeersList()
    p := config.Peer + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", ",", -1)
    return p
}
tools/middleware/middleware.go
New file
@@ -0,0 +1,191 @@
package middleware
import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "net/http"
    "strconv"
    "strings"
    "swfs/config"
    "swfs/tools/es"
    "swfs/tools/script"
    "sync"
    "time"
)
//作为主节点加入(默认包含数据节点)
func AsMaster(role string) bool {
    es.AddNewMasterToPeers()
    nowPeers := es.GetNowPeersList()
    fmt.Println("nowPeers: ", nowPeers)
    RequestNodesOperation(nowPeers, role)
    return true
}
//作为数据节点加入
func AsVolume() bool {
    nowPeers := es.GetNowPeersList()
    if nowPeers == nil || len(nowPeers) == 0 {
        return false
    }
    script.ReplaceLineContentBySearch(config.Option+config.StartScriptAsVolume, config.Option, config.Server.ScriptPath, config.StartServerScript)
    script.ReplaceLineContentBySearch(es.GetNewPeers(), config.Peer, config.Server.ScriptPath, config.StartServerScript)
    script.StartServer(config.Server.ScriptPath)
    return true
}
//作为 主+数据 节点加入
func AsMaVo(role string) {
    AsMaster(role)
    script.ReplaceLineContentBySearch(config.Option+config.StartScriptAsMaVo, config.Option, config.Server.ScriptPath, config.StartServerScript)
}
//更新所有节点的脚本参数
func UpdateAllNodesScriptArgument(peersIp []string) {
    fmt.Println("开始更新本地配置文件")
    for _, ip := range peersIp {
        fmt.Println("ip: ", ip)
        url := "http://" + ip + ":7020/node/api-v/swfs/updateSWFSService"
        fmt.Println("url", url)
        resp, _ := http.Get(url)
        fmt.Println("更新返回状态:", resp.StatusCode)
        if resp.StatusCode == 200 {
            fmt.Println("请求完毕", resp.StatusCode)
        }
    }
}
//请求作为当前角色节点操作流程
func RequestNodesOperation(nowPeers []interface{}, role string) {
    peersIp := make([]string, 0)
    for _, val := range nowPeers {
        peersIp = append(peersIp, strings.Split(val.(string), ":")[0])
    }
    coreBaseUnit, _ := strconv.Atoi(config.Server.CoreBaseUnit)
    UpdateAllNodesScriptArgument(peersIp)
    replaceContent := ""
    if role == "master" {
        replaceContent = config.Option + config.StartScriptAsMaster
    } else if role == "master+volume" {
        replaceContent = config.Option + config.StartScriptAsMaVo
    }
    script.ReplaceLineContentBySearch(replaceContent, config.Option, config.Server.ScriptPath, config.StartServerScript)
    RestartAllServer(peersIp, coreBaseUnit)
}
//重启所有节点服务并验证
func Restart(ip string, timeOut int) {
    url := "http://" + ip + ":7020/node/api-v/swfs/restartServer"
    var info interface{}
    httpRes, _ := http.Get(url)
    body, _ := ioutil.ReadAll(httpRes.Body)
    json.Unmarshal(body, &info)
    res, ok := info.(map[string]interface{})
    if !ok {
        fmt.Println("http response interface can not change map[string]interface{}")
    }
    fmt.Println("res: ", res)
    startupItem := res["data"].(string)
    if httpRes.StatusCode != 200 {
        return
    }
    fmt.Println("Restart startupItem: ", startupItem)
    tick := time.Tick(1 * time.Second)
    fmt.Println("准备开始验证节点服务")
    for countdown := timeOut; countdown > 0; countdown-- {
        fmt.Println("第", countdown, "次验证")
        result := Verification(startupItem, ip)
        fmt.Println("第一次验证result结果:", result)
        if result == true {
            break
        }
        <-tick
    }
    fmt.Println("验证完毕")
}
//验证服务状态
func Verification(startupItem string, ip string) bool {
    resStatu := false
    fmt.Println("Verification startupItem: ", startupItem)
    switch startupItem {
    case config.StartScriptAsVolume:
        verificationVolumeUrl := "http://" + ip + ":6700/ui/index.html"
        _, volume1Err := http.Get(verificationVolumeUrl)
        if volume1Err == nil {
            resStatu = true
        }
    case config.StartScriptAsMaster:
        verificationMasterUrl := "http://" + ip + ":6333"
        _, masterErr := http.Get(verificationMasterUrl)
        if masterErr == nil {
            resStatu = true
        }
    case config.StartScriptAsMaVo:
        verificationMasterUrl := "http://" + ip + ":6333"
        verificationVolumeUrl := "http://" + ip + ":6700/ui/index.html"
        _, masterErr := http.Get(verificationMasterUrl)
        fmt.Println("masterErr", masterErr)
        _, volume1Err := http.Get(verificationVolumeUrl)
        fmt.Println("volume1Err", volume1Err)
        if masterErr == nil && volume1Err == nil {
            resStatu = true
        }
    }
    return resStatu
}
//获取本地启动项
func GetLocalStartupItem(scriptPath string, scriptFile string) string {
    startupItem := script.GetNowLineContent(scriptPath+"/"+scriptFile, config.Option)
    fmt.Println("startupItem: ", startupItem)
    return startupItem
}
//构建重启流程
func RestartAllServer(peersIp []string, coreBaseUnit int) {
    fmt.Println("开始构建重启流程")
    coreThread := len(peersIp)/coreBaseUnit + 1
    masterIp := make([]string, 0)
    timeOut, _ := strconv.Atoi(config.Server.TimeOut)
    var waitGroup sync.WaitGroup
    fmt.Println("当前并行度coreThread:", coreThread)
    for i, ip := range peersIp {
        fmt.Println("重启当前组服务" + ip)
        if (i+1)%coreThread == 0 {
            masterIp = append(masterIp, strings.Split(ip, ":")[0])
            fmt.Println("加入第一组并开始验证第一组 masterIp: ", masterIp)
            for i := 0; i < len(masterIp); i++ {
                fmt.Println("len masterIp: ", len(masterIp))
                fmt.Println("第" + strconv.Itoa(i) + "个线程")
                fmt.Println("当前goroutinebe")
                waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
                fmt.Println("当前goroutineaf")
                go Restart(masterIp[i], timeOut)
                waitGroup.Done()
            }
            fmt.Println("这里为阻塞!!!!!")
            waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
            fmt.Println("当前组任务完成")
            masterIp = make([]string, 0)
            fmt.Println("清空当前组成员:", masterIp)
        } else {
            masterIp = append(masterIp, strings.Split(ip, ":")[0])
            if len(peersIp) == i+1 {
                var waitGroup sync.WaitGroup
                for i := 0; i < len(masterIp); i++ {
                    waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
                    go Restart(masterIp[i], timeOut)
                    waitGroup.Done()
                }
                waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
                masterIp = make([]string, 0)
                break
            }
        }
    }
    fmt.Println("服务流程执行完毕")
}
tools/script/script.go
New file
@@ -0,0 +1,34 @@
package script
import (
    "fmt"
    "strings"
    "swfs/config"
    "swfs/util"
)
//根据搜索内容替换整行内容
func ReplaceLineContentBySearch(replaceContent string, searchContent string, scriptPath string, scriptFile string) {
    resSearchContent := GetNowLineContent(scriptPath+"/"+scriptFile, searchContent)
    replaceStr := "sed -ie 's/" + resSearchContent + "/" + replaceContent + "/g' " + scriptPath + "/" + scriptFile
    util.RunScript(replaceStr)
}
//获取查找内容当前行内容
func GetNowLineContent(filePath string, searchContent string) string {
    scriptStr := "cat " + filePath + "| grep " + searchContent
    fmt.Println("scriptStr: ", scriptStr)
    return strings.Split(util.RunScript(scriptStr), "\n")[0]
}
//启动服务
func StartServer(scriptPath string) {
    //fmt.Println("sh " + scriptPath + "/" + StartServerScript)
    util.RunScript("sh " + scriptPath + "/" + config.StartServerScript)
}
//停止服务
func StopServer(scriptPath string) {
    //fmt.Println("sh " + scriptPath + "/" + StopServerScript)
    util.RunScript("sh " + scriptPath + "/" + config.StopServerScript)
}