New file |
| | |
| | | package config |
| | | |
| | | import ( |
| | | "github.com/spf13/viper" |
| | | "log" |
| | | ) |
| | | |
| | | type server struct { |
| | | EsServerIp string `mapstructure: "esServerIp"` |
| | | EsServerPort string `mapstructure: "esServerPort"` |
| | | } |
| | | |
| | | type elastic struct { |
| | | IndexName string `mapstructure: "indexName"` |
| | | IndexType string `mapstructure: "indexType"` |
| | | } |
| | | |
| | | var Server = &server{} |
| | | var BasicFS = &elastic{} |
| | | |
| | | func Init(env string) { |
| | | var err error |
| | | viper.SetConfigType("yaml") |
| | | viper.SetConfigName(env) |
| | | viper.AddConfigPath("/opt/vasystem/config") |
| | | err = viper.ReadInConfig() |
| | | if err != nil { |
| | | log.Fatal("error on parsing configuration file", err) |
| | | } |
| | | viper.UnmarshalKey("server", Server) |
| | | viper.UnmarshalKey("elastic.basicFS", BasicFS) |
| | | //fmt.Println(AiOcean) |
| | | //fmt.Println(BasicFS) |
| | | } |
New file |
| | |
| | | package controllers |
| | | |
| | | import ( |
| | | "fmt" |
| | | "github.com/gin-gonic/gin" |
| | | "strings" |
| | | "test/config" |
| | | "test/util" |
| | | "time" |
| | | ) |
| | | |
| | | type SeaweedfsController struct{} |
| | | |
| | | type SWFSInfo struct { |
| | | Role string `json:"role"` |
| | | } |
| | | |
| | | //修改 |
| | | func (sc *SeaweedfsController) UpdateSWFSServiceController(c *gin.Context) { |
| | | oldPeers := GetOldPeers() |
| | | newPeers := GetNewPeers() |
| | | UpdatePeers(oldPeers, newPeers) |
| | | } |
| | | |
| | | func (sc *SeaweedfsController) AddSWFSNodeController(c *gin.Context) { |
| | | var body SWFSInfo |
| | | c.BindJSON(&body) |
| | | role := body.Role |
| | | if role == "master" { |
| | | AsMaster() |
| | | } else if role == "volume" { |
| | | AsVolume() |
| | | } else { |
| | | |
| | | return |
| | | } |
| | | |
| | | } |
| | | |
| | | func (sc *SeaweedfsController) RestartMaster(c *gin.Context) { |
| | | end := "sh /opt/vasystem/script/seaweedfs_stop.sh" |
| | | start := "sh /opt/vasystem/script/seaweedfs_start.sh" |
| | | util.RunScript(end) |
| | | util.RunScript(start) |
| | | } |
| | | |
| | | func RequestMasterNodesOperation() { |
| | | nowPeers := GetNowPeersList() |
| | | coreThread := len(nowPeers)/100 + 1 |
| | | masterIp := make([]string, 0) |
| | | for i, val := range nowPeers { |
| | | ip := val.(string) |
| | | if (i+1)%coreThread == 0 { |
| | | masterIp = append(masterIp, strings.Split(ip, ":")[0]) |
| | | for _, val := range masterIp { |
| | | RestartOrtherMaster(val) |
| | | masterIp = append(masterIp[:0], masterIp[1:]...) |
| | | } |
| | | time.Sleep(time.Second * 2) |
| | | } else { |
| | | masterIp = append(masterIp, strings.Split(ip, ":")[0]) |
| | | if len(nowPeers) == i+1 { |
| | | for _, val := range masterIp { |
| | | RestartOrtherMaster(val) |
| | | } |
| | | break |
| | | } |
| | | continue |
| | | } |
| | | } |
| | | } |
| | | |
| | | func GetOldPeers() string { |
| | | str := "cat /opt/vasystem/seaweedfs_start.sh | grep peers=" |
| | | peers := strings.Split(util.RunScript(str), "\n")[0] |
| | | return peers |
| | | } |
| | | |
| | | func AsVolume() { |
| | | nowPeers := GetNowPeersList() |
| | | fmt.Println(nowPeers) |
| | | } |
| | | |
| | | func AsMaster() { |
| | | nowPeers := GetNowPeersList() |
| | | coreThread := len(nowPeers) / 100 |
| | | fmt.Println(coreThread) |
| | | |
| | | } |
| | | |
| | | func GetNowPeersList() []interface{} { |
| | | |
| | | getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search" |
| | | getJson := `{ |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "term": { |
| | | "application":"nodeOperation" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "size": 1 |
| | | }` |
| | | |
| | | buf, _ := util.EsReq("POST", getUrl, []byte(getJson)) |
| | | source, _ := util.Sourcelist(buf) |
| | | //fmt.Println(source) |
| | | peers := source[0]["peers"].([]interface{}) |
| | | return peers |
| | | } |
| | | |
| | | func GetNewPeers() string { |
| | | |
| | | getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search" |
| | | getJson := `{ |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "term": { |
| | | "application":"nodeOperation" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "size": 1 |
| | | }` |
| | | |
| | | buf, _ := util.EsReq("POST", getUrl, []byte(getJson)) |
| | | source, _ := util.Sourcelist(buf) |
| | | //fmt.Println(source) |
| | | peers := source[0]["peers"].([]interface{}) |
| | | fmt.Println(peers) |
| | | p := "peers=" + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", ",", -1) |
| | | return p |
| | | } |
| | | |
| | | func UpdatePeers(oldPeers string, newPeers string) { |
| | | str := "sed -ie 's/" + oldPeers + "/" + newPeers + "/g' /opt/vasystem/seaweedfs_start.sh" |
| | | util.RunScript(str) |
| | | } |
| | | |
| | | func AddNewMasterToPeers() (result bool) { |
| | | peer := config.Server.EsServerIp + ":6333" |
| | | addUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_update_by_query" |
| | | addJson := `{ |
| | | "script": { |
| | | "lang": "painless", |
| | | "inline": "ctx._source.peers.add(params.newpeer)", |
| | | "params": { |
| | | "newpeer": "` + peer + `" |
| | | } |
| | | }, |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "term": { |
| | | "application": "nodeOperation" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | } |
| | | }` |
| | | buf, _ := util.EsReq("POST", addUrl, []byte(addJson)) |
| | | updateRes, _ := util.SourceUpdated(buf) |
| | | if updateRes == -1 { |
| | | result = false |
| | | } else { |
| | | result = true |
| | | } |
| | | return result |
| | | } |
| | |
| | | module test |
| | | |
| | | go 1.12 |
| | | |
| | | require ( |
| | | github.com/gin-gonic/gin v1.6.1 |
| | | github.com/spf13/viper v1.6.2 |
| | | ) |
| | |
| | | package main |
| | | |
| | | import ( |
| | | "bytes" |
| | | "encoding/json" |
| | | "errors" |
| | | "fmt" |
| | | "io/ioutil" |
| | | "net/http" |
| | | "os/exec" |
| | | "strings" |
| | | "time" |
| | | "flag" |
| | | "test/config" |
| | | ) |
| | | |
| | | var env = flag.String("pro", "pro", "read storage info") |
| | | |
| | | func init() { |
| | | config.Init(*env) |
| | | } |
| | | |
| | | func main() { |
| | | |
| | | oldPeers := GetOldPeers() |
| | | fmt.Println("oldPeers: ", oldPeers) |
| | | //AddNewMasterToPeers() |
| | | newPeers := GetNewPeers() |
| | | fmt.Println("newPeers: ", newPeers) |
| | | UpdatePeers(oldPeers, newPeers) |
| | | time.Sleep(time.Second * 3) |
| | | nowPeers := GetOldPeers() |
| | | fmt.Println("nowPeers: ", nowPeers) |
| | | } |
| | | |
| | | func GetOldPeers() string { |
| | | str := "cat /opt/vasystem/seaweedfs_start.sh | grep peers=" |
| | | peers := strings.Split(RunScript(str), "\n")[0] |
| | | return peers |
| | | } |
| | | |
| | | func GetNewPeers() string { |
| | | getUrl := "http://192.168.20.10:9200/basicfs/_search" |
| | | getJson := `{ |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "term": { |
| | | "application":"nodeOperation" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | }, |
| | | "size": 1 |
| | | }` |
| | | |
| | | buf, _ := EsReq("POST", getUrl, []byte(getJson)) |
| | | source, _ := Sourcelist(buf) |
| | | //fmt.Println(source) |
| | | peers := source[0]["peers"].([]interface{}) |
| | | fmt.Println(peers) |
| | | p := "peers=" + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", ",", -1) |
| | | return p |
| | | } |
| | | |
| | | func UpdatePeers(oldPeers string, newPeers string) { |
| | | str := "sed -ie 's/" + oldPeers + "/" + newPeers + "/g' /opt/vasystem/seaweedfs_start.sh" |
| | | fmt.Println(str) |
| | | RunScript(str) |
| | | } |
| | | |
| | | //脚本封装 |
| | | func RunScript(str string) string { |
| | | |
| | | cmd := exec.Command("sh", "-c", str) |
| | | var out bytes.Buffer |
| | | cmd.Stdout = &out |
| | | err := cmd.Run() |
| | | if err != nil { |
| | | return "运行失败" |
| | | } |
| | | return out.String() |
| | | } |
| | | |
| | | //解析http |
| | | func EsReq(method string, url string, parama []byte) (buf []byte, err error) { |
| | | timeout := time.Duration(10 * time.Second) |
| | | client := http.Client{ |
| | | Timeout: timeout, |
| | | } |
| | | request, err := http.NewRequest(method, url, bytes.NewBuffer(parama)) |
| | | request.Header.Set("Content-type", "application/json") |
| | | |
| | | if err != nil { |
| | | fmt.Println("build request fail !") |
| | | return nil, err |
| | | } |
| | | |
| | | resp, err := client.Do(request) |
| | | if err != nil { |
| | | fmt.Println("request error: ", err) |
| | | return nil, err |
| | | } |
| | | |
| | | defer resp.Body.Close() |
| | | body, err := ioutil.ReadAll(resp.Body) |
| | | if err != nil { |
| | | fmt.Println(err) |
| | | return nil, err |
| | | } |
| | | return body, nil |
| | | } |
| | | |
| | | func Sourcelist(buf []byte) (sources []map[string]interface{}, err error) { |
| | | var info interface{} |
| | | json.Unmarshal(buf, &info) |
| | | out, ok := info.(map[string]interface{}) |
| | | if !ok { |
| | | return nil, errors.New("http response interface can not change map[string]interface{}") |
| | | } |
| | | |
| | | middle, ok := out["hits"].(map[string]interface{}) |
| | | if !ok { |
| | | return nil, errors.New("first hits change error!") |
| | | } |
| | | for _, in := range middle["hits"].([]interface{}) { |
| | | tmpbuf, ok := in.(map[string]interface{}) |
| | | if !ok { |
| | | fmt.Println("change to source error!") |
| | | continue |
| | | } |
| | | source, ok := tmpbuf["_source"].(map[string]interface{}) |
| | | if !ok { |
| | | fmt.Println("change _source error!") |
| | | continue |
| | | } |
| | | sources = append(sources, source) |
| | | } |
| | | return sources, nil |
| | | } |
| | | |
| | | func AddNewMasterToPeers() (result bool) { |
| | | |
| | | peer := "192.168.5.22:6333" |
| | | addUrl := "http://192.168.20.10:9200/basicfs/_update_by_query" |
| | | addJson := `{ |
| | | "script": { |
| | | "lang": "painless", |
| | | "inline": "ctx._source.peers.add(params.newpeer)", |
| | | "params": { |
| | | "newpeer": "` + peer + `" |
| | | } |
| | | }, |
| | | "query": { |
| | | "bool": { |
| | | "filter": [ |
| | | { |
| | | "term": { |
| | | "application": "nodeOperation" |
| | | } |
| | | } |
| | | ] |
| | | } |
| | | } |
| | | }` |
| | | buf, _ := EsReq("POST", addUrl, []byte(addJson)) |
| | | updateRes, _ := SourceUpdated(buf) |
| | | if updateRes == -1 { |
| | | result = false |
| | | } else { |
| | | result = true |
| | | } |
| | | return result |
| | | } |
| | | |
| | | func SourceUpdated(buf []byte) (total int, err error) { |
| | | var info interface{} |
| | | json.Unmarshal(buf, &info) |
| | | out, ok := info.(map[string]interface{}) |
| | | if !ok { |
| | | return -1, errors.New("http response interface can not change map[string]interface{}") |
| | | } |
| | | |
| | | middle, ok := out["updated"].(float64) |
| | | if !ok { |
| | | return -1, errors.New("first total change error!") |
| | | } |
| | | total = int(middle) |
| | | return total, nil |
| | | } |
New file |
| | |
| | | package router |
| | | |
| | | import ( |
| | | "github.com/gin-gonic/gin" |
| | | "test/controllers" |
| | | ) |
| | | |
| | | func NewRouter() *gin.Context { |
| | | r := gin.Default() |
| | | swfsController := new(controllers.SeaweedfsController) |
| | | |
| | | urlPrefix := "/node/api-v" |
| | | swfsApi := r.Group(urlPrefix + "swfs") |
| | | { |
| | | swfsApi.POST("/addNode", swfsController.AddSWFSNodeController) |
| | | swfsApi.POST("/updateSWFSService", swfsController.UpdateSWFSServiceController) |
| | | swfsApi.GET("/", swfsController.RestartMaster) |
| | | } |
| | | } |
New file |
| | |
| | | package util |
| | | |
| | | import ( |
| | | "bytes" |
| | | "encoding/json" |
| | | "errors" |
| | | "fmt" |
| | | "io/ioutil" |
| | | "net/http" |
| | | "os/exec" |
| | | "time" |
| | | ) |
| | | |
| | | //脚本封装 |
| | | func RunScript(str string) string { |
| | | |
| | | cmd := exec.Command("sh", "-c", str) |
| | | var out bytes.Buffer |
| | | cmd.Stdout = &out |
| | | err := cmd.Run() |
| | | if err != nil { |
| | | return "运行失败" |
| | | } |
| | | return out.String() |
| | | } |
| | | |
| | | //解析http |
| | | func EsReq(method string, url string, parama []byte) (buf []byte, err error) { |
| | | timeout := time.Duration(10 * time.Second) |
| | | client := http.Client{ |
| | | Timeout: timeout, |
| | | } |
| | | request, err := http.NewRequest(method, url, bytes.NewBuffer(parama)) |
| | | request.Header.Set("Content-type", "application/json") |
| | | |
| | | if err != nil { |
| | | fmt.Println("build request fail !") |
| | | return nil, err |
| | | } |
| | | |
| | | resp, err := client.Do(request) |
| | | if err != nil { |
| | | fmt.Println("request error: ", err) |
| | | return nil, err |
| | | } |
| | | |
| | | defer resp.Body.Close() |
| | | body, err := ioutil.ReadAll(resp.Body) |
| | | if err != nil { |
| | | fmt.Println(err) |
| | | return nil, err |
| | | } |
| | | return body, nil |
| | | } |
| | | |
| | | //解析json |
| | | func Sourcelist(buf []byte) (sources []map[string]interface{}, err error) { |
| | | var info interface{} |
| | | json.Unmarshal(buf, &info) |
| | | out, ok := info.(map[string]interface{}) |
| | | if !ok { |
| | | return nil, errors.New("http response interface can not change map[string]interface{}") |
| | | } |
| | | |
| | | middle, ok := out["hits"].(map[string]interface{}) |
| | | if !ok { |
| | | return nil, errors.New("first hits change error!") |
| | | } |
| | | for _, in := range middle["hits"].([]interface{}) { |
| | | tmpbuf, ok := in.(map[string]interface{}) |
| | | if !ok { |
| | | fmt.Println("change to source error!") |
| | | continue |
| | | } |
| | | source, ok := tmpbuf["_source"].(map[string]interface{}) |
| | | if !ok { |
| | | fmt.Println("change _source error!") |
| | | continue |
| | | } |
| | | sources = append(sources, source) |
| | | } |
| | | return sources, nil |
| | | } |
| | | |
| | | //解析更新 |
| | | func SourceUpdated(buf []byte) (total int, err error) { |
| | | var info interface{} |
| | | json.Unmarshal(buf, &info) |
| | | out, ok := info.(map[string]interface{}) |
| | | if !ok { |
| | | return -1, errors.New("http response interface can not change map[string]interface{}") |
| | | } |
| | | |
| | | middle, ok := out["updated"].(float64) |
| | | if !ok { |
| | | return -1, errors.New("first total change error!") |
| | | } |
| | | total = int(middle) |
| | | return total, nil |
| | | } |