package controllers
|
|
import (
|
"encoding/json"
|
"fmt"
|
"github.com/gin-gonic/gin"
|
"io/ioutil"
|
"net/http"
|
"strconv"
|
"strings"
|
"swfs/code"
|
"swfs/config"
|
"swfs/util"
|
"sync"
|
"time"
|
)
|
|
const (
|
StartServerScript = "seaweedfs_start.sh"
|
StopServerScript = "seaweedfs_stop.sh"
|
//作为 volume 启动
|
StartScriptAsVolume = "1"
|
//作为 master 启动
|
StartScriptAsMaster = "2"
|
//作为 master + volume 启动
|
StartScriptAsMaVo = "3"
|
//启动项参数头
|
Option = "option="
|
//master列表项参数头
|
Peer = "peers="
|
//StartScriptAsFiler = "option=4"
|
)
|
|
type SeaweedfsController struct{}
|
|
type SWFSInfo struct {
|
Role string `json:"role"`
|
}
|
|
func (sc *SeaweedfsController) UpdateSWFSServiceController(c *gin.Context) {
|
ReplaceLineContentBySearch(GetNewPeers(), Peer, config.Server.ScriptPath, StartServerScript)
|
util.ResponseFormat(c, code.Success, config.Server.EsServerIp+"更新成功")
|
}
|
|
// @Security ApiKeyAuth
|
// @Summary 新节点加入
|
// @Description 新节点加入
|
// @Accept json
|
// @Produce json
|
// @Tags swfs
|
// @Param obj body SWFSInfo true "加入角色参数"
|
// @Success 200 {string} json "{"code":200, msg:"", success:true}"
|
// @Failure 500 {string} json "{"code":500, msg:"", success:false}"
|
// @Router /node/api-v/swfs/addSWFSNode [POST]
|
func (sc *SeaweedfsController) AddSWFSNodeController(c *gin.Context) {
|
var body SWFSInfo
|
c.BindJSON(&body)
|
role := body.Role
|
if role == "master" {
|
AsMaster(role)
|
util.ResponseFormat(c, code.AddSuccess, "加入节点成功")
|
return
|
} else if role == "volume" {
|
status := AsVolume()
|
if status == true {
|
util.ResponseFormat(c, code.AddSuccess, "加入节点成功")
|
return
|
} else {
|
util.ResponseFormat(c, code.AddClusterInfoErr, "当前还没有主节点")
|
return
|
}
|
} else if role == "master+volume" {
|
AsMaVo(role)
|
util.ResponseFormat(c, code.AddSuccess, "加入节点成功")
|
} else {
|
util.ResponseFormat(c, code.RequestParamError, "选择节点类型错误")
|
return
|
}
|
|
}
|
|
func AsMaVo(role string) {
|
AsMaster(role)
|
ReplaceLineContentBySearch(Option+StartScriptAsMaVo, Option, config.Server.ScriptPath, StartServerScript)
|
}
|
|
func (sc *SeaweedfsController) RoleOfVolumeToMasterController(c *gin.Context) {
|
AsMaster("master")
|
ReplaceLineContentBySearch(Option+StartScriptAsMaVo, Option, config.Server.ScriptPath, StartServerScript)
|
}
|
|
func (sc *SeaweedfsController) RestartServerController(c *gin.Context) {
|
StopServer(config.Server.ScriptPath)
|
time.Sleep(time.Second * 1)
|
StartServer(config.Server.ScriptPath)
|
//fmt.Println("GetLocalStartupItem: ", GetLocalStartupItem(config.Server.ScriptPath, StartServerScript))
|
result := strings.Split(GetLocalStartupItem(config.Server.ScriptPath, StartServerScript), "=")[1]
|
fmt.Println("result: ", result)
|
util.ResponseFormat(c, code.Success, result)
|
}
|
|
//启动服务
|
func StartServer(scriptPath string) {
|
util.RunScript("sh " + scriptPath + StartServerScript)
|
}
|
|
//停止服务
|
func StopServer(scriptPath string) {
|
util.RunScript("sh " + scriptPath + StopServerScript)
|
}
|
|
//根据搜索内容替换整行内容
|
func ReplaceLineContentBySearch(replaceContent string, searchContent string, scriptPath string, scriptFile string) {
|
resSearchContent := GetNowLineContent(scriptPath+"/"+scriptFile, searchContent)
|
replaceStr := "sed -ie 's/" + resSearchContent + "/" + replaceContent + "/g' " + scriptPath + "/" + scriptFile
|
util.RunScript(replaceStr)
|
}
|
|
//更新所有节点的脚本参数
|
func UpdateAllNodesScriptArgument(peersIp []string) {
|
fmt.Println("开始更新本地配置文件")
|
for _, ip := range peersIp {
|
fmt.Println("ip: ", ip)
|
url := "http://" + ip + ":7020/node/api-v/swfs/updateSWFSService"
|
fmt.Println("url", url)
|
resp, _ := http.Get(url)
|
fmt.Println("更新返回状态:", resp.StatusCode)
|
if resp.StatusCode == 200 {
|
fmt.Println("请求完毕", resp.StatusCode)
|
}
|
}
|
}
|
|
//请求作为当前角色节点操作流程
|
func RequestNodesOperation(nowPeers []interface{}, role string) {
|
peersIp := make([]string, 0)
|
for _, val := range nowPeers {
|
peersIp = append(peersIp, strings.Split(val.(string), ":")[0])
|
}
|
coreBaseUnit, _ := strconv.Atoi(config.Server.CoreBaseUnit)
|
UpdateAllNodesScriptArgument(peersIp)
|
replaceContent := ""
|
if role == "master" {
|
replaceContent = Option + StartScriptAsMaster
|
} else if role == "master+volume" {
|
replaceContent = Option + StartScriptAsMaVo
|
}
|
ReplaceLineContentBySearch(replaceContent, Option, config.Server.ScriptPath, StartServerScript)
|
RestartAllServer(peersIp, coreBaseUnit)
|
}
|
|
//重启所有节点服务并验证
|
func Restart(ip string, timeOut int) {
|
url := "http://" + ip + ":7020/node/api-v/swfs/restartServer"
|
var info interface{}
|
httpRes, _ := http.Get(url)
|
defer httpRes.Body.Close()
|
body, _ := ioutil.ReadAll(httpRes.Body)
|
json.Unmarshal(body, &info)
|
res, ok := info.(map[string]interface{})
|
if !ok {
|
fmt.Println("http response interface can not change map[string]interface{}")
|
}
|
startupItem := res["data"].(string)
|
if httpRes.StatusCode == 200 {
|
startupItem = string(body)
|
}
|
tick := time.Tick(1 * time.Second)
|
fmt.Println("准备开始验证节点服务")
|
for countdown := timeOut; countdown > 0; countdown-- {
|
fmt.Println("第", countdown, "次验证")
|
result := Verification(startupItem, ip)
|
fmt.Println("第一次验证result结果:", result)
|
if result == true {
|
break
|
}
|
<-tick
|
}
|
fmt.Println("验证完毕")
|
}
|
|
//验证服务状态
|
func Verification(startupItem string, ip string) bool {
|
resStatu := false
|
switch startupItem {
|
case StartScriptAsVolume:
|
verificationVolumeUrl := "http://" + ip + ":6700"
|
_, volume1Err := http.Get(verificationVolumeUrl)
|
if volume1Err == nil {
|
resStatu = true
|
}
|
case StartScriptAsMaster:
|
verificationMasterUrl := "http://" + ip + ":6333"
|
_, masterErr := http.Get(verificationMasterUrl)
|
if masterErr == nil {
|
resStatu = true
|
}
|
case StartScriptAsMaVo:
|
verificationMasterUrl := "http://" + ip + ":6333"
|
verificationVolumeUrl := "http://" + ip + ":6700"
|
_, masterErr := http.Get(verificationMasterUrl)
|
_, volume1Err := http.Get(verificationVolumeUrl)
|
if masterErr == nil && volume1Err == nil {
|
resStatu = true
|
}
|
}
|
return resStatu
|
}
|
|
//获取本地启动项
|
func GetLocalStartupItem(scriptPath string, scriptFile string) string {
|
startupItem := GetNowLineContent(scriptPath+"/"+scriptFile, Option)
|
fmt.Println("startupItem: ", startupItem)
|
return startupItem
|
}
|
|
//构建重启流程
|
func RestartAllServer(peersIp []string, coreBaseUnit int) {
|
fmt.Println("开始构建重启流程")
|
coreThread := len(peersIp)/coreBaseUnit + 1
|
masterIp := make([]string, 0)
|
timeOut, _ := strconv.Atoi(config.Server.TimeOut)
|
var waitGroup sync.WaitGroup
|
fmt.Println("当前并行度coreThread:", coreThread)
|
for i, ip := range peersIp {
|
fmt.Println("重启第一组服务" + ip)
|
if (i+1)%coreThread == 0 {
|
masterIp = append(masterIp, strings.Split(ip, ":")[0])
|
fmt.Println("加入第一组并开始验证第一组 masterIp: ", masterIp)
|
for i := 0; i < len(masterIp); i++ {
|
fmt.Println("len masterIp: ", len(masterIp))
|
fmt.Println("第" + strconv.Itoa(i) + "个线程")
|
go Restart(masterIp[i], timeOut)
|
waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
|
}
|
waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
|
masterIp = make([]string, 0)
|
} else {
|
masterIp = append(masterIp, strings.Split(ip, ":")[0])
|
if len(peersIp) == i+1 {
|
var waitGroup sync.WaitGroup
|
for i := 0; i < len(masterIp); i++ {
|
go Restart(masterIp[i], timeOut)
|
waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
|
}
|
waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
|
masterIp = make([]string, 0)
|
break
|
}
|
}
|
}
|
fmt.Println("服务流程执行完毕")
|
|
}
|
|
//获取查找内容当前行内容
|
func GetNowLineContent(filePath string, searchContent string) string {
|
scriptStr := "cat " + filePath + "| grep " + searchContent
|
fmt.Println("scriptStr: ", scriptStr)
|
return strings.Split(util.RunScript(scriptStr), "\n")[0]
|
}
|
|
//作为数据节点加入
|
func AsVolume() bool {
|
nowPeers := GetNowPeersList()
|
if nowPeers == nil || len(nowPeers) == 0 {
|
return false
|
}
|
ReplaceLineContentBySearch(Option+StartScriptAsVolume, Option, config.Server.ScriptPath, StartServerScript)
|
ReplaceLineContentBySearch(GetNewPeers(), Peer, config.Server.ScriptPath, StartServerScript)
|
StartServer(config.Server.ScriptPath)
|
return true
|
}
|
|
//作为主节点加入(默认包含数据节点)
|
func AsMaster(role string) bool {
|
AddNewMasterToPeers()
|
nowPeers := GetNowPeersList()
|
fmt.Println("nowPeers: ", nowPeers)
|
RequestNodesOperation(nowPeers, role)
|
return true
|
}
|
|
//获取当前集群列表
|
func GetNowPeersList() []interface{} {
|
getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search"
|
getJson := `{
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"term": {
|
"application":"nodeOperation"
|
}
|
}
|
]
|
}
|
},
|
"size": 1
|
}`
|
|
buf, _ := util.EsReq("POST", getUrl, []byte(getJson))
|
source, _ := util.Sourcelist(buf)
|
peers := source[0]["peers"].([]interface{})
|
return peers
|
}
|
|
//获取当前集群列表格式化信息
|
func GetNewPeers() string {
|
peers := GetNowPeersList()
|
p := Peer + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", ",", -1)
|
return p
|
}
|
|
//向集群加入新的master
|
func AddNewMasterToPeers() (result bool) {
|
peer := config.Server.EsServerIp + ":6333"
|
addUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_update_by_query?refresh=true"
|
addJson := `{
|
"script": {
|
"lang": "painless",
|
"inline": "ctx._source.peers.add(params.newpeer)",
|
"params": {
|
"newpeer": "` + peer + `"
|
}
|
},
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"term": {
|
"application": "nodeOperation"
|
}
|
}
|
]
|
}
|
}
|
}`
|
buf, _ := util.EsReq("POST", addUrl, []byte(addJson))
|
updateRes, _ := util.SourceUpdated(buf)
|
if updateRes == -1 {
|
result = false
|
} else {
|
result = true
|
}
|
return result
|
}
|