package controllers
|
|
import (
|
"fmt"
|
"github.com/gin-gonic/gin"
|
"net/http"
|
"strconv"
|
"strings"
|
"swfs/config"
|
"swfs/util"
|
"sync"
|
"time"
|
)
|
|
type SeaweedfsController struct{}
|
|
type SWFSInfo struct {
|
Role string `json:"role"`
|
}
|
|
//修改
|
func (sc *SeaweedfsController) UpdateSWFSServiceController(c *gin.Context) {
|
oldPeers := GetOldPeers()
|
newPeers := GetNewPeers()
|
UpdatePeers(oldPeers, newPeers)
|
}
|
|
func (sc *SeaweedfsController) AddSWFSNodeController(c *gin.Context) {
|
var body SWFSInfo
|
c.BindJSON(&body)
|
role := body.Role
|
nowPeers := GetNowPeersList()
|
if role == "master" {
|
AsMaster(nowPeers)
|
} else if role == "volume" {
|
AsVolume(nowPeers)
|
} else {
|
return
|
}
|
|
}
|
|
func ReplaceLineContentBySearch(replaceContent string) {
|
resContent := GetNowLineContent("/opt/vasystem/script/seaweedfs_start.sh", "#start_master_server")
|
replaceStr := "sed -ie 's/" + resContent + "/" + replaceContent + "/g' /opt/vasystem/seaweedfs_start.sh"
|
util.RunScript(replaceStr)
|
}
|
|
func (sc *SeaweedfsController) RestartMasterController(c *gin.Context) {
|
end := "sh /opt/vasystem/script/seaweedfs_stop.sh"
|
start := "sh /opt/vasystem/script/seaweedfs_start.sh"
|
util.RunScript(end)
|
util.RunScript(start)
|
}
|
|
func UpdateAllNodesScriptArgument(nowPeers []interface{}) {
|
for _, val := range nowPeers {
|
ip := val.(string)
|
url := "http://" + ip + ":7020/node/api-v/swfs/updateSWFSService"
|
http.Get(url)
|
}
|
}
|
|
func RequestMasterNodesOperation(nowPeers []interface{}) {
|
//fmt.Println("config.Server.CoreBaseUnit", config.Server.CoreBaseUnit)
|
coreBaseUnit, _ := strconv.Atoi(config.Server.CoreBaseUnit)
|
//fmt.Println("nowPeers: ", nowPeers)
|
//fmt.Println("coreBaseUnit: ", coreBaseUnit)
|
UpdateAllNodesScriptArgument(nowPeers)
|
RestartAllMaster(nowPeers, coreBaseUnit)
|
}
|
|
func RestartServer(ip string, timeOut int) {
|
url := "http://" + ip + ":7020/node/api-v/swfs/restartMaster"
|
http.Get(url)
|
tick := time.Tick(1 * time.Second)
|
for countdown := timeOut; countdown > 0; countdown-- {
|
verificationMasterUrl := "http://" + ip + ":6333"
|
verificationVolume1Url := "http://" + ip + ":6700"
|
verificationVolume2Url := "http://" + ip + ":6701"
|
_, masterErr := http.Get(verificationMasterUrl)
|
_, volume1Err := http.Get(verificationVolume1Url)
|
_, volume2Err := http.Get(verificationVolume2Url)
|
if masterErr == nil && volume1Err == nil && volume2Err == nil {
|
break
|
}
|
<-tick
|
}
|
}
|
|
func RestartAllMaster(nowPeers []interface{}, coreBaseUnit int) {
|
coreThread := len(nowPeers)/coreBaseUnit + 1
|
masterIp := make([]string, 0)
|
timeOut, _ := strconv.Atoi(config.Server.TimeOut)
|
var waitGroup sync.WaitGroup
|
for i, val := range nowPeers {
|
ip := val.(string)
|
if (i+1)%coreThread == 0 {
|
masterIp = append(masterIp, strings.Split(ip, ":")[0])
|
for i := 0; i < len(masterIp); i++ {
|
go RestartServer(masterIp[i], timeOut)
|
waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
|
}
|
waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
|
masterIp = make([]string, 0)
|
} else {
|
masterIp = append(masterIp, strings.Split(ip, ":")[0])
|
if len(nowPeers) == i+1 {
|
var waitGroup sync.WaitGroup
|
for i := 0; i < len(masterIp); i++ {
|
go RestartServer(masterIp[i], timeOut)
|
waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
|
}
|
waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
|
masterIp = make([]string, 0)
|
break
|
}
|
}
|
continue
|
}
|
}
|
|
func GetOldPeers() string {
|
str := "cat /opt/vasystem/seaweedfs_start.sh | grep peers="
|
peers := strings.Split(util.RunScript(str), "\n")[0]
|
return peers
|
}
|
|
func GetNowLineContent(filePath string, searchContent string) string {
|
scriptStr := "cat" + filePath + "| grep " + searchContent
|
content := strings.Split(util.RunScript(scriptStr), "\n")[0]
|
return content
|
}
|
|
func AsVolume(nowPeers []interface{}) {
|
ReplaceLineContentBySearch("start_master_server")
|
fmt.Println(nowPeers)
|
}
|
|
func AsMaster(nowPeers []interface{}) {
|
AddNewMasterToPeers()
|
RequestMasterNodesOperation(nowPeers)
|
}
|
|
func (sc *SeaweedfsController) RoleOfVolumeToMasterController(c *gin.Context) {
|
AsMaster(GetNowPeersList())
|
}
|
|
func GetNowPeersList() []interface{} {
|
|
getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search"
|
getJson := `{
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"term": {
|
"application":"nodeOperation"
|
}
|
}
|
]
|
}
|
},
|
"size": 1
|
}`
|
|
buf, _ := util.EsReq("POST", getUrl, []byte(getJson))
|
source, _ := util.Sourcelist(buf)
|
//fmt.Println(source)
|
peers := source[0]["peers"].([]interface{})
|
return peers
|
}
|
|
func GetNewPeers() string {
|
|
getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search"
|
getJson := `{
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"term": {
|
"application":"nodeOperation"
|
}
|
}
|
]
|
}
|
},
|
"size": 1
|
}`
|
|
buf, _ := util.EsReq("POST", getUrl, []byte(getJson))
|
source, _ := util.Sourcelist(buf)
|
//fmt.Println(source)
|
peers := source[0]["peers"].([]interface{})
|
fmt.Println(peers)
|
p := "peers=" + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", ",", -1)
|
return p
|
}
|
|
func UpdatePeers(oldPeers string, newPeers string) {
|
str := "sed -ie 's/" + oldPeers + "/" + newPeers + "/g' /opt/vasystem/seaweedfs_start.sh"
|
util.RunScript(str)
|
}
|
|
func AddNewMasterToPeers() (result bool) {
|
peer := config.Server.EsServerIp + ":6333"
|
addUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_update_by_query"
|
addJson := `{
|
"script": {
|
"lang": "painless",
|
"inline": "ctx._source.peers.add(params.newpeer)",
|
"params": {
|
"newpeer": "` + peer + `"
|
}
|
},
|
"query": {
|
"bool": {
|
"filter": [
|
{
|
"term": {
|
"application": "nodeOperation"
|
}
|
}
|
]
|
}
|
}
|
}`
|
buf, _ := util.EsReq("POST", addUrl, []byte(addJson))
|
updateRes, _ := util.SourceUpdated(buf)
|
if updateRes == -1 {
|
result = false
|
} else {
|
result = true
|
}
|
return result
|
}
|