From e12bca22ef82aa5a2df0448a4715b1a948604156 Mon Sep 17 00:00:00 2001 From: sunty <1172534965@qq.com> Date: 星期二, 07 四月 2020 14:57:38 +0800 Subject: [PATCH] first commit --- config/config.go | 34 +++ service/service.go | 1 go.mod | 5 main.go | 184 ------------------- controllers/swfsControllers.go | 179 +++++++++++++++++++ util/util.go | 100 +++++++++++ router/router.go | 19 ++ 7 files changed, 346 insertions(+), 176 deletions(-) diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..6fdffad --- /dev/null +++ b/config/config.go @@ -0,0 +1,34 @@ +package config + +import ( + "github.com/spf13/viper" + "log" +) + +type server struct { + EsServerIp string `mapstructure: "esServerIp"` + EsServerPort string `mapstructure: "esServerPort"` +} + +type elastic struct { + IndexName string `mapstructure: "indexName"` + IndexType string `mapstructure: "indexType"` +} + +var Server = &server{} +var BasicFS = &elastic{} + +func Init(env string) { + var err error + viper.SetConfigType("yaml") + viper.SetConfigName(env) + viper.AddConfigPath("/opt/vasystem/config") + err = viper.ReadInConfig() + if err != nil { + log.Fatal("error on parsing configuration file", err) + } + viper.UnmarshalKey("server", Server) + viper.UnmarshalKey("elastic.basicFS", BasicFS) + //fmt.Println(AiOcean) + //fmt.Println(BasicFS) +} diff --git a/controllers/swfsControllers.go b/controllers/swfsControllers.go new file mode 100644 index 0000000..17d0b7b --- /dev/null +++ b/controllers/swfsControllers.go @@ -0,0 +1,179 @@ +package controllers + +import ( + "fmt" + "github.com/gin-gonic/gin" + "strings" + "test/config" + "test/util" + "time" +) + +type SeaweedfsController struct{} + +type SWFSInfo struct { + Role string `json:"role"` +} + +//淇敼 +func (sc *SeaweedfsController) UpdateSWFSServiceController(c *gin.Context) { + oldPeers := GetOldPeers() + newPeers := GetNewPeers() + UpdatePeers(oldPeers, newPeers) +} + +func (sc *SeaweedfsController) AddSWFSNodeController(c *gin.Context) { + var body SWFSInfo + c.BindJSON(&body) + role := body.Role + if role == "master" { + AsMaster() + } else if role == "volume" { + AsVolume() + } else { + + return + } + +} + +func (sc *SeaweedfsController) RestartMaster(c *gin.Context) { + end := "sh /opt/vasystem/script/seaweedfs_stop.sh" + start := "sh /opt/vasystem/script/seaweedfs_start.sh" + util.RunScript(end) + util.RunScript(start) +} + +func RequestMasterNodesOperation() { + nowPeers := GetNowPeersList() + coreThread := len(nowPeers)/100 + 1 + masterIp := make([]string, 0) + for i, val := range nowPeers { + ip := val.(string) + if (i+1)%coreThread == 0 { + masterIp = append(masterIp, strings.Split(ip, ":")[0]) + for _, val := range masterIp { + RestartOrtherMaster(val) + masterIp = append(masterIp[:0], masterIp[1:]...) + } + time.Sleep(time.Second * 2) + } else { + masterIp = append(masterIp, strings.Split(ip, ":")[0]) + if len(nowPeers) == i+1 { + for _, val := range masterIp { + RestartOrtherMaster(val) + } + break + } + continue + } + } +} + +func GetOldPeers() string { + str := "cat /opt/vasystem/seaweedfs_start.sh | grep peers=" + peers := strings.Split(util.RunScript(str), "\n")[0] + return peers +} + +func AsVolume() { + nowPeers := GetNowPeersList() + fmt.Println(nowPeers) +} + +func AsMaster() { + nowPeers := GetNowPeersList() + coreThread := len(nowPeers) / 100 + fmt.Println(coreThread) + +} + +func GetNowPeersList() []interface{} { + + getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search" + getJson := `{ + "query": { + "bool": { + "filter": [ + { + "term": { + "application":"nodeOperation" + } + } + ] + } + }, + "size": 1 +}` + + buf, _ := util.EsReq("POST", getUrl, []byte(getJson)) + source, _ := util.Sourcelist(buf) + //fmt.Println(source) + peers := source[0]["peers"].([]interface{}) + return peers +} + +func GetNewPeers() string { + + getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search" + getJson := `{ + "query": { + "bool": { + "filter": [ + { + "term": { + "application":"nodeOperation" + } + } + ] + } + }, + "size": 1 +}` + + buf, _ := util.EsReq("POST", getUrl, []byte(getJson)) + source, _ := util.Sourcelist(buf) + //fmt.Println(source) + peers := source[0]["peers"].([]interface{}) + fmt.Println(peers) + p := "peers=" + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", ",", -1) + return p +} + +func UpdatePeers(oldPeers string, newPeers string) { + str := "sed -ie 's/" + oldPeers + "/" + newPeers + "/g' /opt/vasystem/seaweedfs_start.sh" + util.RunScript(str) +} + +func AddNewMasterToPeers() (result bool) { + peer := config.Server.EsServerIp + ":6333" + addUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_update_by_query" + addJson := `{ + "script": { + "lang": "painless", + "inline": "ctx._source.peers.add(params.newpeer)", + "params": { + "newpeer": "` + peer + `" + } + }, + "query": { + "bool": { + "filter": [ + { + "term": { + "application": "nodeOperation" + } + } + ] + } + } +}` + buf, _ := util.EsReq("POST", addUrl, []byte(addJson)) + updateRes, _ := util.SourceUpdated(buf) + if updateRes == -1 { + result = false + } else { + result = true + } + return result +} diff --git a/go.mod b/go.mod index e27256d..e899b4a 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,8 @@ module test go 1.12 + +require ( + github.com/gin-gonic/gin v1.6.1 + github.com/spf13/viper v1.6.2 +) diff --git a/main.go b/main.go index 7499b8e..1ca5668 100644 --- a/main.go +++ b/main.go @@ -1,184 +1,16 @@ package main import ( - "bytes" - "encoding/json" - "errors" - "fmt" - "io/ioutil" - "net/http" - "os/exec" - "strings" - "time" + "flag" + "test/config" ) + +var env = flag.String("pro", "pro", "read storage info") + +func init() { + config.Init(*env) +} func main() { - oldPeers := GetOldPeers() - fmt.Println("oldPeers: ", oldPeers) - //AddNewMasterToPeers() - newPeers := GetNewPeers() - fmt.Println("newPeers: ", newPeers) - UpdatePeers(oldPeers, newPeers) - time.Sleep(time.Second * 3) - nowPeers := GetOldPeers() - fmt.Println("nowPeers: ", nowPeers) -} - -func GetOldPeers() string { - str := "cat /opt/vasystem/seaweedfs_start.sh | grep peers=" - peers := strings.Split(RunScript(str), "\n")[0] - return peers -} - -func GetNewPeers() string { - getUrl := "http://192.168.20.10:9200/basicfs/_search" - getJson := `{ - "query": { - "bool": { - "filter": [ - { - "term": { - "application":"nodeOperation" - } - } - ] - } - }, - "size": 1 -}` - - buf, _ := EsReq("POST", getUrl, []byte(getJson)) - source, _ := Sourcelist(buf) - //fmt.Println(source) - peers := source[0]["peers"].([]interface{}) - fmt.Println(peers) - p := "peers=" + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", ",", -1) - return p -} - -func UpdatePeers(oldPeers string, newPeers string) { - str := "sed -ie 's/" + oldPeers + "/" + newPeers + "/g' /opt/vasystem/seaweedfs_start.sh" - fmt.Println(str) - RunScript(str) -} - -//鑴氭湰灏佽 -func RunScript(str string) string { - - cmd := exec.Command("sh", "-c", str) - var out bytes.Buffer - cmd.Stdout = &out - err := cmd.Run() - if err != nil { - return "杩愯澶辫触" - } - return out.String() -} - -//瑙f瀽http -func EsReq(method string, url string, parama []byte) (buf []byte, err error) { - timeout := time.Duration(10 * time.Second) - client := http.Client{ - Timeout: timeout, - } - request, err := http.NewRequest(method, url, bytes.NewBuffer(parama)) - request.Header.Set("Content-type", "application/json") - - if err != nil { - fmt.Println("build request fail !") - return nil, err - } - - resp, err := client.Do(request) - if err != nil { - fmt.Println("request error: ", err) - return nil, err - } - - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - fmt.Println(err) - return nil, err - } - return body, nil -} - -func Sourcelist(buf []byte) (sources []map[string]interface{}, err error) { - var info interface{} - json.Unmarshal(buf, &info) - out, ok := info.(map[string]interface{}) - if !ok { - return nil, errors.New("http response interface can not change map[string]interface{}") - } - - middle, ok := out["hits"].(map[string]interface{}) - if !ok { - return nil, errors.New("first hits change error!") - } - for _, in := range middle["hits"].([]interface{}) { - tmpbuf, ok := in.(map[string]interface{}) - if !ok { - fmt.Println("change to source error!") - continue - } - source, ok := tmpbuf["_source"].(map[string]interface{}) - if !ok { - fmt.Println("change _source error!") - continue - } - sources = append(sources, source) - } - return sources, nil -} - -func AddNewMasterToPeers() (result bool) { - - peer := "192.168.5.22:6333" - addUrl := "http://192.168.20.10:9200/basicfs/_update_by_query" - addJson := `{ - "script": { - "lang": "painless", - "inline": "ctx._source.peers.add(params.newpeer)", - "params": { - "newpeer": "` + peer + `" - } - }, - "query": { - "bool": { - "filter": [ - { - "term": { - "application": "nodeOperation" - } - } - ] - } - } -}` - buf, _ := EsReq("POST", addUrl, []byte(addJson)) - updateRes, _ := SourceUpdated(buf) - if updateRes == -1 { - result = false - } else { - result = true - } - return result -} - -func SourceUpdated(buf []byte) (total int, err error) { - var info interface{} - json.Unmarshal(buf, &info) - out, ok := info.(map[string]interface{}) - if !ok { - return -1, errors.New("http response interface can not change map[string]interface{}") - } - - middle, ok := out["updated"].(float64) - if !ok { - return -1, errors.New("first total change error!") - } - total = int(middle) - return total, nil } diff --git a/router/router.go b/router/router.go new file mode 100644 index 0000000..b434b92 --- /dev/null +++ b/router/router.go @@ -0,0 +1,19 @@ +package router + +import ( + "github.com/gin-gonic/gin" + "test/controllers" +) + +func NewRouter() *gin.Context { + r := gin.Default() + swfsController := new(controllers.SeaweedfsController) + + urlPrefix := "/node/api-v" + swfsApi := r.Group(urlPrefix + "swfs") + { + swfsApi.POST("/addNode", swfsController.AddSWFSNodeController) + swfsApi.POST("/updateSWFSService", swfsController.UpdateSWFSServiceController) + swfsApi.GET("/", swfsController.RestartMaster) + } +} diff --git a/service/service.go b/service/service.go new file mode 100644 index 0000000..6d43c33 --- /dev/null +++ b/service/service.go @@ -0,0 +1 @@ +package service diff --git a/util/util.go b/util/util.go new file mode 100644 index 0000000..bf2337a --- /dev/null +++ b/util/util.go @@ -0,0 +1,100 @@ +package util + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "os/exec" + "time" +) + +//鑴氭湰灏佽 +func RunScript(str string) string { + + cmd := exec.Command("sh", "-c", str) + var out bytes.Buffer + cmd.Stdout = &out + err := cmd.Run() + if err != nil { + return "杩愯澶辫触" + } + return out.String() +} + +//瑙f瀽http +func EsReq(method string, url string, parama []byte) (buf []byte, err error) { + timeout := time.Duration(10 * time.Second) + client := http.Client{ + Timeout: timeout, + } + request, err := http.NewRequest(method, url, bytes.NewBuffer(parama)) + request.Header.Set("Content-type", "application/json") + + if err != nil { + fmt.Println("build request fail !") + return nil, err + } + + resp, err := client.Do(request) + if err != nil { + fmt.Println("request error: ", err) + return nil, err + } + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Println(err) + return nil, err + } + return body, nil +} + +//瑙f瀽json +func Sourcelist(buf []byte) (sources []map[string]interface{}, err error) { + var info interface{} + json.Unmarshal(buf, &info) + out, ok := info.(map[string]interface{}) + if !ok { + return nil, errors.New("http response interface can not change map[string]interface{}") + } + + middle, ok := out["hits"].(map[string]interface{}) + if !ok { + return nil, errors.New("first hits change error!") + } + for _, in := range middle["hits"].([]interface{}) { + tmpbuf, ok := in.(map[string]interface{}) + if !ok { + fmt.Println("change to source error!") + continue + } + source, ok := tmpbuf["_source"].(map[string]interface{}) + if !ok { + fmt.Println("change _source error!") + continue + } + sources = append(sources, source) + } + return sources, nil +} + +//瑙f瀽鏇存柊 +func SourceUpdated(buf []byte) (total int, err error) { + var info interface{} + json.Unmarshal(buf, &info) + out, ok := info.(map[string]interface{}) + if !ok { + return -1, errors.New("http response interface can not change map[string]interface{}") + } + + middle, ok := out["updated"].(float64) + if !ok { + return -1, errors.New("first total change error!") + } + total = int(middle) + return total, nil +} -- Gitblit v1.8.0