sunty
2020-04-07 e12bca22ef82aa5a2df0448a4715b1a948604156
first commit
5个文件已添加
2个文件已修改
522 ■■■■■ 已修改文件
config/config.go 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
controllers/swfsControllers.go 179 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 184 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
router/router.go 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/service.go 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
util/util.go 100 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config/config.go
New file
@@ -0,0 +1,34 @@
package config
import (
    "github.com/spf13/viper"
    "log"
)
type server struct {
    EsServerIp   string `mapstructure: "esServerIp"`
    EsServerPort string `mapstructure: "esServerPort"`
}
type elastic struct {
    IndexName string `mapstructure: "indexName"`
    IndexType string `mapstructure: "indexType"`
}
var Server = &server{}
var BasicFS = &elastic{}
func Init(env string) {
    var err error
    viper.SetConfigType("yaml")
    viper.SetConfigName(env)
    viper.AddConfigPath("/opt/vasystem/config")
    err = viper.ReadInConfig()
    if err != nil {
        log.Fatal("error on parsing configuration file", err)
    }
    viper.UnmarshalKey("server", Server)
    viper.UnmarshalKey("elastic.basicFS", BasicFS)
    //fmt.Println(AiOcean)
    //fmt.Println(BasicFS)
}
controllers/swfsControllers.go
New file
@@ -0,0 +1,179 @@
package controllers
import (
    "fmt"
    "github.com/gin-gonic/gin"
    "strings"
    "test/config"
    "test/util"
    "time"
)
type SeaweedfsController struct{}
type SWFSInfo struct {
    Role string `json:"role"`
}
//修改
func (sc *SeaweedfsController) UpdateSWFSServiceController(c *gin.Context) {
    oldPeers := GetOldPeers()
    newPeers := GetNewPeers()
    UpdatePeers(oldPeers, newPeers)
}
func (sc *SeaweedfsController) AddSWFSNodeController(c *gin.Context) {
    var body SWFSInfo
    c.BindJSON(&body)
    role := body.Role
    if role == "master" {
        AsMaster()
    } else if role == "volume" {
        AsVolume()
    } else {
        return
    }
}
func (sc *SeaweedfsController) RestartMaster(c *gin.Context) {
    end := "sh /opt/vasystem/script/seaweedfs_stop.sh"
    start := "sh /opt/vasystem/script/seaweedfs_start.sh"
    util.RunScript(end)
    util.RunScript(start)
}
func RequestMasterNodesOperation() {
    nowPeers := GetNowPeersList()
    coreThread := len(nowPeers)/100 + 1
    masterIp := make([]string, 0)
    for i, val := range nowPeers {
        ip := val.(string)
        if (i+1)%coreThread == 0 {
            masterIp = append(masterIp, strings.Split(ip, ":")[0])
            for _, val := range masterIp {
                RestartOrtherMaster(val)
                masterIp = append(masterIp[:0], masterIp[1:]...)
            }
            time.Sleep(time.Second * 2)
        } else {
            masterIp = append(masterIp, strings.Split(ip, ":")[0])
            if len(nowPeers) == i+1 {
                for _, val := range masterIp {
                    RestartOrtherMaster(val)
                }
                break
            }
            continue
        }
    }
}
func GetOldPeers() string {
    str := "cat /opt/vasystem/seaweedfs_start.sh | grep peers="
    peers := strings.Split(util.RunScript(str), "\n")[0]
    return peers
}
func AsVolume() {
    nowPeers := GetNowPeersList()
    fmt.Println(nowPeers)
}
func AsMaster() {
    nowPeers := GetNowPeersList()
    coreThread := len(nowPeers) / 100
    fmt.Println(coreThread)
}
func GetNowPeersList() []interface{} {
    getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search"
    getJson := `{
    "query": {
        "bool": {
            "filter": [
                {
                    "term": {
                        "application":"nodeOperation"
                    }
                }
            ]
        }
    },
    "size": 1
}`
    buf, _ := util.EsReq("POST", getUrl, []byte(getJson))
    source, _ := util.Sourcelist(buf)
    //fmt.Println(source)
    peers := source[0]["peers"].([]interface{})
    return peers
}
func GetNewPeers() string {
    getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search"
    getJson := `{
    "query": {
        "bool": {
            "filter": [
                {
                    "term": {
                        "application":"nodeOperation"
                    }
                }
            ]
        }
    },
    "size": 1
}`
    buf, _ := util.EsReq("POST", getUrl, []byte(getJson))
    source, _ := util.Sourcelist(buf)
    //fmt.Println(source)
    peers := source[0]["peers"].([]interface{})
    fmt.Println(peers)
    p := "peers=" + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", ",", -1)
    return p
}
func UpdatePeers(oldPeers string, newPeers string) {
    str := "sed -ie 's/" + oldPeers + "/" + newPeers + "/g' /opt/vasystem/seaweedfs_start.sh"
    util.RunScript(str)
}
func AddNewMasterToPeers() (result bool) {
    peer := config.Server.EsServerIp + ":6333"
    addUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_update_by_query"
    addJson := `{
    "script": {
        "lang": "painless",
        "inline": "ctx._source.peers.add(params.newpeer)",
        "params": {
            "newpeer": "` + peer + `"
        }
    },
    "query": {
        "bool": {
            "filter": [
                {
                    "term": {
                        "application": "nodeOperation"
                    }
                }
            ]
        }
    }
}`
    buf, _ := util.EsReq("POST", addUrl, []byte(addJson))
    updateRes, _ := util.SourceUpdated(buf)
    if updateRes == -1 {
        result = false
    } else {
        result = true
    }
    return result
}
go.mod
@@ -1,3 +1,8 @@
module test
go 1.12
require (
    github.com/gin-gonic/gin v1.6.1
    github.com/spf13/viper v1.6.2
)
main.go
@@ -1,184 +1,16 @@
package main
import (
    "bytes"
    "encoding/json"
    "errors"
    "fmt"
    "io/ioutil"
    "net/http"
    "os/exec"
    "strings"
    "time"
    "flag"
    "test/config"
)
var env = flag.String("pro", "pro", "read storage info")
func init() {
    config.Init(*env)
}
func main() {
    oldPeers := GetOldPeers()
    fmt.Println("oldPeers: ", oldPeers)
    //AddNewMasterToPeers()
    newPeers := GetNewPeers()
    fmt.Println("newPeers: ", newPeers)
    UpdatePeers(oldPeers, newPeers)
    time.Sleep(time.Second * 3)
    nowPeers := GetOldPeers()
    fmt.Println("nowPeers: ", nowPeers)
}
func GetOldPeers() string {
    str := "cat /opt/vasystem/seaweedfs_start.sh | grep peers="
    peers := strings.Split(RunScript(str), "\n")[0]
    return peers
}
func GetNewPeers() string {
    getUrl := "http://192.168.20.10:9200/basicfs/_search"
    getJson := `{
    "query": {
        "bool": {
            "filter": [
                {
                    "term": {
                        "application":"nodeOperation"
                    }
                }
            ]
        }
    },
    "size": 1
}`
    buf, _ := EsReq("POST", getUrl, []byte(getJson))
    source, _ := Sourcelist(buf)
    //fmt.Println(source)
    peers := source[0]["peers"].([]interface{})
    fmt.Println(peers)
    p := "peers=" + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", ",", -1)
    return p
}
func UpdatePeers(oldPeers string, newPeers string) {
    str := "sed -ie 's/" + oldPeers + "/" + newPeers + "/g' /opt/vasystem/seaweedfs_start.sh"
    fmt.Println(str)
    RunScript(str)
}
//脚本封装
func RunScript(str string) string {
    cmd := exec.Command("sh", "-c", str)
    var out bytes.Buffer
    cmd.Stdout = &out
    err := cmd.Run()
    if err != nil {
        return "运行失败"
    }
    return out.String()
}
//解析http
func EsReq(method string, url string, parama []byte) (buf []byte, err error) {
    timeout := time.Duration(10 * time.Second)
    client := http.Client{
        Timeout: timeout,
    }
    request, err := http.NewRequest(method, url, bytes.NewBuffer(parama))
    request.Header.Set("Content-type", "application/json")
    if err != nil {
        fmt.Println("build request fail !")
        return nil, err
    }
    resp, err := client.Do(request)
    if err != nil {
        fmt.Println("request error: ", err)
        return nil, err
    }
    defer resp.Body.Close()
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        fmt.Println(err)
        return nil, err
    }
    return body, nil
}
func Sourcelist(buf []byte) (sources []map[string]interface{}, err error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return nil, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["hits"].(map[string]interface{})
    if !ok {
        return nil, errors.New("first hits change error!")
    }
    for _, in := range middle["hits"].([]interface{}) {
        tmpbuf, ok := in.(map[string]interface{})
        if !ok {
            fmt.Println("change to source error!")
            continue
        }
        source, ok := tmpbuf["_source"].(map[string]interface{})
        if !ok {
            fmt.Println("change _source error!")
            continue
        }
        sources = append(sources, source)
    }
    return sources, nil
}
func AddNewMasterToPeers() (result bool) {
    peer := "192.168.5.22:6333"
    addUrl := "http://192.168.20.10:9200/basicfs/_update_by_query"
    addJson := `{
    "script": {
        "lang": "painless",
        "inline": "ctx._source.peers.add(params.newpeer)",
        "params": {
            "newpeer": "` + peer + `"
        }
    },
    "query": {
        "bool": {
            "filter": [
                {
                    "term": {
                        "application": "nodeOperation"
                    }
                }
            ]
        }
    }
}`
    buf, _ := EsReq("POST", addUrl, []byte(addJson))
    updateRes, _ := SourceUpdated(buf)
    if updateRes == -1 {
        result = false
    } else {
        result = true
    }
    return result
}
func SourceUpdated(buf []byte) (total int, err error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return -1, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["updated"].(float64)
    if !ok {
        return -1, errors.New("first total change error!")
    }
    total = int(middle)
    return total, nil
}
router/router.go
New file
@@ -0,0 +1,19 @@
package router
import (
    "github.com/gin-gonic/gin"
    "test/controllers"
)
func NewRouter() *gin.Context {
    r := gin.Default()
    swfsController := new(controllers.SeaweedfsController)
    urlPrefix := "/node/api-v"
    swfsApi := r.Group(urlPrefix + "swfs")
    {
        swfsApi.POST("/addNode", swfsController.AddSWFSNodeController)
        swfsApi.POST("/updateSWFSService", swfsController.UpdateSWFSServiceController)
        swfsApi.GET("/", swfsController.RestartMaster)
    }
}
service/service.go
New file
@@ -0,0 +1 @@
package service
util/util.go
New file
@@ -0,0 +1,100 @@
package util
import (
    "bytes"
    "encoding/json"
    "errors"
    "fmt"
    "io/ioutil"
    "net/http"
    "os/exec"
    "time"
)
//脚本封装
func RunScript(str string) string {
    cmd := exec.Command("sh", "-c", str)
    var out bytes.Buffer
    cmd.Stdout = &out
    err := cmd.Run()
    if err != nil {
        return "运行失败"
    }
    return out.String()
}
//解析http
func EsReq(method string, url string, parama []byte) (buf []byte, err error) {
    timeout := time.Duration(10 * time.Second)
    client := http.Client{
        Timeout: timeout,
    }
    request, err := http.NewRequest(method, url, bytes.NewBuffer(parama))
    request.Header.Set("Content-type", "application/json")
    if err != nil {
        fmt.Println("build request fail !")
        return nil, err
    }
    resp, err := client.Do(request)
    if err != nil {
        fmt.Println("request error: ", err)
        return nil, err
    }
    defer resp.Body.Close()
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        fmt.Println(err)
        return nil, err
    }
    return body, nil
}
//解析json
func Sourcelist(buf []byte) (sources []map[string]interface{}, err error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return nil, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["hits"].(map[string]interface{})
    if !ok {
        return nil, errors.New("first hits change error!")
    }
    for _, in := range middle["hits"].([]interface{}) {
        tmpbuf, ok := in.(map[string]interface{})
        if !ok {
            fmt.Println("change to source error!")
            continue
        }
        source, ok := tmpbuf["_source"].(map[string]interface{})
        if !ok {
            fmt.Println("change _source error!")
            continue
        }
        sources = append(sources, source)
    }
    return sources, nil
}
//解析更新
func SourceUpdated(buf []byte) (total int, err error) {
    var info interface{}
    json.Unmarshal(buf, &info)
    out, ok := info.(map[string]interface{})
    if !ok {
        return -1, errors.New("http response interface can not change map[string]interface{}")
    }
    middle, ok := out["updated"].(float64)
    if !ok {
        return -1, errors.New("first total change error!")
    }
    total = int(middle)
    return total, nil
}