sunty
2020-04-16 e2ca6ba654f1371f3b375392e29e3fa070ad96ce
controllers/swfsControllers.go
@@ -1,12 +1,34 @@
package controllers
import (
   "encoding/json"
   "fmt"
   "github.com/gin-gonic/gin"
   "io/ioutil"
   "net/http"
   "strconv"
   "strings"
   "test/config"
   "test/util"
   "swfs/code"
   "swfs/config"
   "swfs/util"
   "sync"
   "time"
)
const (
   StartServerScript = "seaweedfs_start.sh"
   StopServerScript  = "seaweedfs_stop.sh"
   //作为 volume 启动
   StartScriptAsVolume = "1"
   //作为 master 启动
   StartScriptAsMaster = "2"
   //作为 master + volume 启动
   StartScriptAsMaVo = "3"
   //启动项参数头
   Option = "option="
   //master列表项参数头
   Peer = "peers="
   //StartScriptAsFiler   = "option=4"
)
type SeaweedfsController struct{}
@@ -15,81 +37,266 @@
   Role string `json:"role"`
}
//修改
func (sc *SeaweedfsController) UpdateSWFSServiceController(c *gin.Context) {
   oldPeers := GetOldPeers()
   newPeers := GetNewPeers()
   UpdatePeers(oldPeers, newPeers)
   ReplaceLineContentBySearch(GetNewPeers(), Peer, config.Server.ScriptPath, StartServerScript)
   util.ResponseFormat(c, code.Success, config.Server.EsServerIp+"更新成功")
}
// @Security ApiKeyAuth
// @Summary 新节点加入
// @Description  新节点加入
// @Accept  json
// @Produce json
// @Tags swfs
// @Param obj body SWFSInfo true "加入角色参数"
// @Success 200 {string} json "{"code":200, msg:"", success:true}"
// @Failure 500 {string} json "{"code":500, msg:"", success:false}"
// @Router /node/api-v/swfs/addSWFSNode [POST]
func (sc *SeaweedfsController) AddSWFSNodeController(c *gin.Context) {
   var body SWFSInfo
   c.BindJSON(&body)
   role := body.Role
   if role == "master" {
      AsMaster()
      AsMaster(role)
      util.ResponseFormat(c, code.AddSuccess, "加入节点成功")
      return
   } else if role == "volume" {
      AsVolume()
      status := AsVolume()
      if status == true {
         util.ResponseFormat(c, code.AddSuccess, "加入节点成功")
         return
      } else {
         util.ResponseFormat(c, code.AddClusterInfoErr, "当前还没有主节点")
         return
      }
   } else if role == "master+volume" {
      AsMaVo(role)
      util.ResponseFormat(c, code.AddSuccess, "加入节点成功")
   } else {
      util.ResponseFormat(c, code.RequestParamError, "选择节点类型错误")
      return
   }
}
func (sc *SeaweedfsController) RestartMaster(c *gin.Context) {
   end := "sh /opt/vasystem/script/seaweedfs_stop.sh"
   start := "sh /opt/vasystem/script/seaweedfs_start.sh"
   util.RunScript(end)
   util.RunScript(start)
func AsMaVo(role string) {
   AsMaster(role)
   ReplaceLineContentBySearch(Option+StartScriptAsMaVo, Option, config.Server.ScriptPath, StartServerScript)
}
func RequestMasterNodesOperation() {
   nowPeers := GetNowPeersList()
   coreThread := len(nowPeers)/100 + 1
   masterIp := make([]string, 0)
   for i, val := range nowPeers {
      ip := val.(string)
      if (i+1)%coreThread == 0 {
         masterIp = append(masterIp, strings.Split(ip, ":")[0])
         for _, val := range masterIp {
            RestartOrtherMaster(val)
            masterIp = append(masterIp[:0], masterIp[1:]...)
         }
         time.Sleep(time.Second * 2)
      } else {
         masterIp = append(masterIp, strings.Split(ip, ":")[0])
         if len(nowPeers) == i+1 {
            for _, val := range masterIp {
               RestartOrtherMaster(val)
            }
            break
         }
         continue
func (sc *SeaweedfsController) RoleOfVolumeToMasterController(c *gin.Context) {
   AsMaster("master")
   ReplaceLineContentBySearch(Option+StartScriptAsMaVo, Option, config.Server.ScriptPath, StartServerScript)
}
func (sc *SeaweedfsController) RestartServerController(c *gin.Context) {
   StopServer(config.Server.ScriptPath)
   StartServer(config.Server.ScriptPath)
   time.Sleep(time.Second * 1)
   //fmt.Println("GetLocalStartupItem: ", GetLocalStartupItem(config.Server.ScriptPath, StartServerScript))
   result := strings.Split(GetLocalStartupItem(config.Server.ScriptPath, StartServerScript), "=")[1]
   fmt.Println("result: ", result)
   util.ResponseFormat(c, code.Success, result)
}
//启动服务
func StartServer(scriptPath string) {
   //fmt.Println("sh " + scriptPath + "/" + StartServerScript)
   util.RunScript("sh " + scriptPath + "/" + StartServerScript)
}
//停止服务
func StopServer(scriptPath string) {
   //fmt.Println("sh " + scriptPath + "/" + StopServerScript)
   util.RunScript("sh " + scriptPath + "/" + StopServerScript)
}
//根据搜索内容替换整行内容
func ReplaceLineContentBySearch(replaceContent string, searchContent string, scriptPath string, scriptFile string) {
   resSearchContent := GetNowLineContent(scriptPath+"/"+scriptFile, searchContent)
   replaceStr := "sed -ie 's/" + resSearchContent + "/" + replaceContent + "/g' " + scriptPath + "/" + scriptFile
   util.RunScript(replaceStr)
}
//更新所有节点的脚本参数
func UpdateAllNodesScriptArgument(peersIp []string) {
   fmt.Println("开始更新本地配置文件")
   for _, ip := range peersIp {
      fmt.Println("ip: ", ip)
      url := "http://" + ip + ":7020/node/api-v/swfs/updateSWFSService"
      fmt.Println("url", url)
      resp, _ := http.Get(url)
      fmt.Println("更新返回状态:", resp.StatusCode)
      if resp.StatusCode == 200 {
         fmt.Println("请求完毕", resp.StatusCode)
      }
   }
}
func GetOldPeers() string {
   str := "cat /opt/vasystem/seaweedfs_start.sh | grep peers="
   peers := strings.Split(util.RunScript(str), "\n")[0]
   return peers
//请求作为当前角色节点操作流程
func RequestNodesOperation(nowPeers []interface{}, role string) {
   peersIp := make([]string, 0)
   for _, val := range nowPeers {
      peersIp = append(peersIp, strings.Split(val.(string), ":")[0])
   }
   coreBaseUnit, _ := strconv.Atoi(config.Server.CoreBaseUnit)
   UpdateAllNodesScriptArgument(peersIp)
   replaceContent := ""
   if role == "master" {
      replaceContent = Option + StartScriptAsMaster
   } else if role == "master+volume" {
      replaceContent = Option + StartScriptAsMaVo
   }
   ReplaceLineContentBySearch(replaceContent, Option, config.Server.ScriptPath, StartServerScript)
   RestartAllServer(peersIp, coreBaseUnit)
}
func AsVolume() {
//重启所有节点服务并验证
func Restart(ip string, timeOut int) {
   url := "http://" + ip + ":7020/node/api-v/swfs/restartServer"
   var info interface{}
   httpRes, _ := http.Get(url)
   body, _ := ioutil.ReadAll(httpRes.Body)
   json.Unmarshal(body, &info)
   res, ok := info.(map[string]interface{})
   if !ok {
      fmt.Println("http response interface can not change map[string]interface{}")
   }
   fmt.Println("res: ", res)
   startupItem := res["data"].(string)
   if httpRes.StatusCode == 200 {
      startupItem = string(body)
   }
   fmt.Println("Restart startupItem: ", startupItem)
   tick := time.Tick(1 * time.Second)
   fmt.Println("准备开始验证节点服务")
   for countdown := timeOut; countdown > 0; countdown-- {
      fmt.Println("第", countdown, "次验证")
      result := Verification(startupItem, ip)
      fmt.Println("第一次验证result结果:", result)
      if result == true {
         break
      }
      <-tick
   }
   fmt.Println("验证完毕")
}
//验证服务状态
func Verification(startupItem string, ip string) bool {
   resStatu := false
   fmt.Println("Verification startupItem: ", startupItem)
   switch startupItem {
   case StartScriptAsVolume:
      verificationVolumeUrl := "http://" + ip + ":6700/ui/index.html"
      _, volume1Err := http.Get(verificationVolumeUrl)
      if volume1Err == nil {
         resStatu = true
      }
   case StartScriptAsMaster:
      verificationMasterUrl := "http://" + ip + ":6333"
      _, masterErr := http.Get(verificationMasterUrl)
      if masterErr == nil {
         resStatu = true
      }
   case StartScriptAsMaVo:
      verificationMasterUrl := "http://" + ip + ":6333"
      verificationVolumeUrl := "http://" + ip + ":6700/ui/index.html"
      _, masterErr := http.Get(verificationMasterUrl)
      fmt.Println("masterErr", masterErr)
      _, volume1Err := http.Get(verificationVolumeUrl)
      fmt.Println("volume1Err", volume1Err)
      if masterErr == nil && volume1Err == nil {
         resStatu = true
      }
   }
   return resStatu
}
//获取本地启动项
func GetLocalStartupItem(scriptPath string, scriptFile string) string {
   startupItem := GetNowLineContent(scriptPath+"/"+scriptFile, Option)
   fmt.Println("startupItem: ", startupItem)
   return startupItem
}
//构建重启流程
func RestartAllServer(peersIp []string, coreBaseUnit int) {
   fmt.Println("开始构建重启流程")
   coreThread := len(peersIp)/coreBaseUnit + 1
   masterIp := make([]string, 0)
   timeOut, _ := strconv.Atoi(config.Server.TimeOut)
   var waitGroup sync.WaitGroup
   fmt.Println("当前并行度coreThread:", coreThread)
   for i, ip := range peersIp {
      fmt.Println("重启当前组服务" + ip)
      if (i+1)%coreThread == 0 {
         masterIp = append(masterIp, strings.Split(ip, ":")[0])
         fmt.Println("加入第一组并开始验证第一组 masterIp: ", masterIp)
         for i := 0; i < len(masterIp); i++ {
            fmt.Println("len masterIp: ", len(masterIp))
            fmt.Println("第" + strconv.Itoa(i) + "个线程")
            fmt.Println("当前goroutinebe")
            waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
            fmt.Println("当前goroutineaf")
            go Restart(masterIp[i], timeOut)
            waitGroup.Done()
         }
         fmt.Println("这里为阻塞!!!!!")
         waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
         fmt.Println("当前组任务完成")
         masterIp = make([]string, 0)
         fmt.Println("清空当前组成员:", masterIp)
      } else {
         masterIp = append(masterIp, strings.Split(ip, ":")[0])
         if len(peersIp) == i+1 {
            var waitGroup sync.WaitGroup
            for i := 0; i < len(masterIp); i++ {
               waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
               go Restart(masterIp[i], timeOut)
               waitGroup.Done()
            }
            waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
            masterIp = make([]string, 0)
            break
         }
      }
   }
   fmt.Println("服务流程执行完毕")
}
//获取查找内容当前行内容
func GetNowLineContent(filePath string, searchContent string) string {
   scriptStr := "cat " + filePath + "| grep " + searchContent
   fmt.Println("scriptStr: ", scriptStr)
   return strings.Split(util.RunScript(scriptStr), "\n")[0]
}
//作为数据节点加入
func AsVolume() bool {
   nowPeers := GetNowPeersList()
   fmt.Println(nowPeers)
   if nowPeers == nil || len(nowPeers) == 0 {
      return false
   }
   ReplaceLineContentBySearch(Option+StartScriptAsVolume, Option, config.Server.ScriptPath, StartServerScript)
   ReplaceLineContentBySearch(GetNewPeers(), Peer, config.Server.ScriptPath, StartServerScript)
   StartServer(config.Server.ScriptPath)
   return true
}
func AsMaster() {
//作为主节点加入(默认包含数据节点)
func AsMaster(role string) bool {
   AddNewMasterToPeers()
   nowPeers := GetNowPeersList()
   coreThread := len(nowPeers) / 100
   fmt.Println(coreThread)
   fmt.Println("nowPeers: ", nowPeers)
   RequestNodesOperation(nowPeers, role)
   return true
}
//获取当前集群列表
func GetNowPeersList() []interface{} {
   getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search"
   getJson := `{
    "query": {
@@ -108,46 +315,21 @@
   buf, _ := util.EsReq("POST", getUrl, []byte(getJson))
   source, _ := util.Sourcelist(buf)
   //fmt.Println(source)
   peers := source[0]["peers"].([]interface{})
   return peers
}
//获取当前集群列表格式化信息
func GetNewPeers() string {
   getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search"
   getJson := `{
    "query": {
        "bool": {
            "filter": [
                {
                    "term": {
                  "application":"nodeOperation"
               }
                }
            ]
        }
    },
    "size": 1
}`
   buf, _ := util.EsReq("POST", getUrl, []byte(getJson))
   source, _ := util.Sourcelist(buf)
   //fmt.Println(source)
   peers := source[0]["peers"].([]interface{})
   fmt.Println(peers)
   p := "peers=" + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", ",", -1)
   peers := GetNowPeersList()
   p := Peer + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", ",", -1)
   return p
}
func UpdatePeers(oldPeers string, newPeers string) {
   str := "sed -ie 's/" + oldPeers + "/" + newPeers + "/g' /opt/vasystem/seaweedfs_start.sh"
   util.RunScript(str)
}
//向集群加入新的master
func AddNewMasterToPeers() (result bool) {
   peer := config.Server.EsServerIp + ":6333"
   addUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_update_by_query"
   addUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_update_by_query?refresh=true"
   addJson := `{
    "script": {
        "lang": "painless",