package controllers
|
|
import (
|
"fmt"
|
"github.com/gin-gonic/gin"
|
"io/ioutil"
|
"net/http"
|
"strconv"
|
"strings"
|
"swfs/code"
|
"swfs/config"
|
"swfs/util"
|
"sync"
|
"time"
|
)
|
|
const (
|
StartServerScript = "seaweedfs_start.sh"
|
StopServerScript = "seaweedfs_stop.sh"
|
//作为 volume 启动
|
StartScriptAsVolume = "1"
|
//作为 master 启动
|
StartScriptAsMaster = "2"
|
//作为 master + volume 启动
|
StartScriptAsMaVo = "3"
|
//启动项参数头
|
Option = "option="
|
//master列表项参数头
|
Peer = "peers="
|
//StartScriptAsFiler = "option=4"
|
)
|
|
type SeaweedfsController struct{}
|
|
type SWFSInfo struct {
|
Role string `json:"role"`
|
}
|
|
func (sc *SeaweedfsController) UpdateSWFSServiceController(c *gin.Context) {
|
ReplaceLineContentBySearch(GetNewPeers(), Peer, config.Server.ScriptPath, StartServerScript)
|
util.ResponseFormat(c, code.Success, config.Server.EsServerIp+"更新成功")
|
}
|
|
// @Security ApiKeyAuth
|
// @Summary 新节点加入
|
// @Description 新节点加入
|
// @Accept json
|
// @Produce json
|
// @Tags swfs
|
// @Param obj body SWFSInfo true "加入角色参数"
|
// @Success 200 {string} json "{"code":200, msg:"", success:true}"
|
// @Failure 500 {string} json "{"code":500, msg:"", success:false}"
|
// @Router /node/api-v/swfs/addSWFSNode [POST]
|
func (sc *SeaweedfsController) AddSWFSNodeController(c *gin.Context) {
|
var body SWFSInfo
|
c.BindJSON(&body)
|
role := body.Role
|
if role == "master" {
|
AsMaster()
|
util.ResponseFormat(c, code.AddSuccess, "加入节点成功")
|
return
|
} else if role == "volume" {
|
status := AsVolume()
|
if status == true {
|
util.ResponseFormat(c, code.AddSuccess, "加入节点成功")
|
return
|
} else {
|
util.ResponseFormat(c, code.AddClusterInfoErr, "当前还没有主节点")
|
return
|
}
|
} else if role == "master+volume" {
|
AsMaVo()
|
util.ResponseFormat(c, code.AddSuccess, "加入节点成功")
|
} else {
|
util.ResponseFormat(c, code.RequestParamError, "选择节点类型错误")
|
return
|
}
|
|
}
|
|
func AsMaVo() {
|
AsMaster()
|
ReplaceLineContentBySearch(StartScriptAsMaVo, Option, config.Server.ScriptPath, StartServerScript)
|
}
|
|
func (sc *SeaweedfsController) RoleOfVolumeToMasterController(c *gin.Context) {
|
AsMaster()
|
ReplaceLineContentBySearch(StartScriptAsMaVo, Option, config.Server.ScriptPath, StartServerScript)
|
}
|
|
func (sc *SeaweedfsController) RestartServerController(c *gin.Context) {
|
StopServer(config.Server.ScriptPath)
|
time.Sleep(time.Second * 1)
|
StartServer(config.Server.ScriptPath)
|
result := strings.Split("=", GetLocalStartupItem(config.Server.ScriptPath, StartServerScript))[1]
|
util.ResponseFormat(c, code.Success, result)
|
}
|
|
//启动服务
|
func StartServer(scriptPath string) {
|
util.RunScript("sh " + scriptPath + StartServerScript)
|
}
|
|
//停止服务
|
func StopServer(scriptPath string) {
|
util.RunScript("sh " + scriptPath + StopServerScript)
|
}
|
|
//根据搜索内容替换整行内容
|
func ReplaceLineContentBySearch(replaceContent string, searchContent string, scriptPath string, scriptFile string) {
|
resSearchContent := GetNowLineContent(scriptPath+"/"+scriptFile, searchContent)
|
replaceStr := "sed -ie 's/" + resSearchContent + "/" + replaceContent + "/g' " + scriptPath + "/" + scriptFile
|
util.RunScript(replaceStr)
|
}
|
|
//更新所有节点的脚本参数
|
func UpdateAllNodesScriptArgument(nowPeers []interface{}) {
|
for _, val := range nowPeers {
|
ip := val.(string)
|
url := "http://" + ip + ":7020/node/api-v/swfs/updateSWFSService"
|
http.Get(url)
|
}
|
}
|
|
//请求作为主节点操作流程
|
func RequestMasterNodesOperation(nowPeers []interface{}) {
|
coreBaseUnit, _ := strconv.Atoi(config.Server.CoreBaseUnit)
|
UpdateAllNodesScriptArgument(nowPeers)
|
RestartAllServer(nowPeers, coreBaseUnit)
|
}
|
|
//重启所有节点服务并验证
|
func Restart(ip string, timeOut int) {
|
url := "http://" + ip + ":7020/node/api-v/swfs/restartServer"
|
httpRes, _ := http.Get(url)
|
defer httpRes.Body.Close()
|
body, _ := ioutil.ReadAll(httpRes.Body)
|
startupItem := ""
|
if httpRes.StatusCode == 200 {
|
startupItem = string(body)
|
}
|
tick := time.Tick(1 * time.Second)
|
for countdown := timeOut; countdown > 0; countdown-- {
|
Verification(startupItem, ip)
|
<-tick
|
}
|
}
|
|
func Verification(startupItem string, ip string) {
|
switch startupItem {
|
case StartScriptAsVolume:
|
verificationVolumeUrl := "http://" + ip + ":6700"
|
_, volume1Err := http.Get(verificationVolumeUrl)
|
if volume1Err == nil {
|
break
|
}
|
case StartScriptAsMaster:
|
verificationMasterUrl := "http://" + ip + ":6333"
|
_, masterErr := http.Get(verificationMasterUrl)
|
if masterErr == nil {
|
break
|
}
|
case StartScriptAsMaVo:
|
verificationMasterUrl := "http://" + ip + ":6333"
|
verificationVolumeUrl := "http://" + ip + ":6700"
|
_, masterErr := http.Get(verificationMasterUrl)
|
_, volume1Err := http.Get(verificationVolumeUrl)
|
if masterErr == nil && volume1Err == nil {
|
break
|
}
|
}
|
}
|
|
//获取本地启动项
|
func GetLocalStartupItem(scriptPath string, scriptFile string) string {
|
startupItem := GetNowLineContent(scriptPath+"/"+scriptFile, Option)
|
fmt.Println(startupItem)
|
return startupItem
|
}
|
|
//构建重启流程
|
func RestartAllServer(nowPeers []interface{}, coreBaseUnit int) {
|
coreThread := len(nowPeers)/coreBaseUnit + 1
|
masterIp := make([]string, 0)
|
timeOut, _ := strconv.Atoi(config.Server.TimeOut)
|
var waitGroup sync.WaitGroup
|
for i, val := range nowPeers {
|
ip := val.(string)
|
if (i+1)%coreThread == 0 {
|
masterIp = append(masterIp, strings.Split(ip, ":")[0])
|
for i := 0; i < len(masterIp); i++ {
|
go Restart(masterIp[i], timeOut)
|
waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
|
}
|
waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
|
masterIp = make([]string, 0)
|
} else {
|
masterIp = append(masterIp, strings.Split(ip, ":")[0])
|
if len(nowPeers) == i+1 {
|
var waitGroup sync.WaitGroup
|
for i := 0; i < len(masterIp); i++ {
|
go Restart(masterIp[i], timeOut)
|
waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
|
}
|
waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
|
masterIp = make([]string, 0)
|
break
|
}
|
}
|
continue
|
}
|
}
|
|
//获取查找内容当前行内容
|
func GetNowLineContent(filePath string, searchContent string) string {
|
scriptStr := "cat" + filePath + "| grep " + searchContent
|
return strings.Split(util.RunScript(scriptStr), "\n")[0]
|
}
|
|
//作为数据节点加入
|
func AsVolume() bool {
|
nowPeers := GetNowPeersList()
|
if nowPeers == nil || len(nowPeers) == 0 {
|
return false
|
}
|
ReplaceLineContentBySearch(StartScriptAsVolume, Option, config.Server.ScriptPath, StartServerScript)
|
ReplaceLineContentBySearch(GetNewPeers(), Peer, config.Server.ScriptPath, StartServerScript)
|
StartServer(config.Server.ScriptPath)
|
return true
|
}
|
|
//作为主节点加入(默认包含数据节点)
|
func AsMaster() bool {
|
AddNewMasterToPeers()
|
nowPeers := GetNowPeersList()
|
RequestMasterNodesOperation(nowPeers)
|
return true
|
}
|
|
//获取当前集群列表
|
func GetNowPeersList() []interface{} {
|
getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search"
|
getJson := `{
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"term": {
|
"application":"nodeOperation"
|
}
|
}
|
]
|
}
|
},
|
"size": 1
|
}`
|
|
buf, _ := util.EsReq("POST", getUrl, []byte(getJson))
|
source, _ := util.Sourcelist(buf)
|
peers := source[0]["peers"].([]interface{})
|
return peers
|
}
|
|
//获取当前集群列表格式化信息
|
func GetNewPeers() string {
|
peers := GetNowPeersList()
|
p := Peer + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", ",", -1)
|
return p
|
}
|
|
//向集群加入新的master
|
func AddNewMasterToPeers() (result bool) {
|
peer := config.Server.EsServerIp + ":6333"
|
addUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_update_by_query"
|
addJson := `{
|
"script": {
|
"lang": "painless",
|
"inline": "ctx._source.peers.add(params.newpeer)",
|
"params": {
|
"newpeer": "` + peer + `"
|
}
|
},
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"term": {
|
"application": "nodeOperation"
|
}
|
}
|
]
|
}
|
}
|
}`
|
buf, _ := util.EsReq("POST", addUrl, []byte(addJson))
|
updateRes, _ := util.SourceUpdated(buf)
|
if updateRes == -1 {
|
result = false
|
} else {
|
result = true
|
}
|
return result
|
}
|