From e12bca22ef82aa5a2df0448a4715b1a948604156 Mon Sep 17 00:00:00 2001
From: sunty <1172534965@qq.com>
Date: 星期二, 07 四月 2020 14:57:38 +0800
Subject: [PATCH] first commit
---
config/config.go | 34 +++
service/service.go | 1
go.mod | 5
main.go | 184 -------------------
controllers/swfsControllers.go | 179 +++++++++++++++++++
util/util.go | 100 +++++++++++
router/router.go | 19 ++
7 files changed, 346 insertions(+), 176 deletions(-)
diff --git a/config/config.go b/config/config.go
new file mode 100644
index 0000000..6fdffad
--- /dev/null
+++ b/config/config.go
@@ -0,0 +1,34 @@
+package config
+
+import (
+ "github.com/spf13/viper"
+ "log"
+)
+
+type server struct {
+ EsServerIp string `mapstructure: "esServerIp"`
+ EsServerPort string `mapstructure: "esServerPort"`
+}
+
+type elastic struct {
+ IndexName string `mapstructure: "indexName"`
+ IndexType string `mapstructure: "indexType"`
+}
+
+var Server = &server{}
+var BasicFS = &elastic{}
+
+func Init(env string) {
+ var err error
+ viper.SetConfigType("yaml")
+ viper.SetConfigName(env)
+ viper.AddConfigPath("/opt/vasystem/config")
+ err = viper.ReadInConfig()
+ if err != nil {
+ log.Fatal("error on parsing configuration file", err)
+ }
+ viper.UnmarshalKey("server", Server)
+ viper.UnmarshalKey("elastic.basicFS", BasicFS)
+ //fmt.Println(AiOcean)
+ //fmt.Println(BasicFS)
+}
diff --git a/controllers/swfsControllers.go b/controllers/swfsControllers.go
new file mode 100644
index 0000000..17d0b7b
--- /dev/null
+++ b/controllers/swfsControllers.go
@@ -0,0 +1,179 @@
+package controllers
+
+import (
+ "fmt"
+ "github.com/gin-gonic/gin"
+ "strings"
+ "test/config"
+ "test/util"
+ "time"
+)
+
+type SeaweedfsController struct{}
+
+type SWFSInfo struct {
+ Role string `json:"role"`
+}
+
+//淇敼
+func (sc *SeaweedfsController) UpdateSWFSServiceController(c *gin.Context) {
+ oldPeers := GetOldPeers()
+ newPeers := GetNewPeers()
+ UpdatePeers(oldPeers, newPeers)
+}
+
+func (sc *SeaweedfsController) AddSWFSNodeController(c *gin.Context) {
+ var body SWFSInfo
+ c.BindJSON(&body)
+ role := body.Role
+ if role == "master" {
+ AsMaster()
+ } else if role == "volume" {
+ AsVolume()
+ } else {
+
+ return
+ }
+
+}
+
+func (sc *SeaweedfsController) RestartMaster(c *gin.Context) {
+ end := "sh /opt/vasystem/script/seaweedfs_stop.sh"
+ start := "sh /opt/vasystem/script/seaweedfs_start.sh"
+ util.RunScript(end)
+ util.RunScript(start)
+}
+
+func RequestMasterNodesOperation() {
+ nowPeers := GetNowPeersList()
+ coreThread := len(nowPeers)/100 + 1
+ masterIp := make([]string, 0)
+ for i, val := range nowPeers {
+ ip := val.(string)
+ if (i+1)%coreThread == 0 {
+ masterIp = append(masterIp, strings.Split(ip, ":")[0])
+ for _, val := range masterIp {
+ RestartOrtherMaster(val)
+ masterIp = append(masterIp[:0], masterIp[1:]...)
+ }
+ time.Sleep(time.Second * 2)
+ } else {
+ masterIp = append(masterIp, strings.Split(ip, ":")[0])
+ if len(nowPeers) == i+1 {
+ for _, val := range masterIp {
+ RestartOrtherMaster(val)
+ }
+ break
+ }
+ continue
+ }
+ }
+}
+
+func GetOldPeers() string {
+ str := "cat /opt/vasystem/seaweedfs_start.sh | grep peers="
+ peers := strings.Split(util.RunScript(str), "\n")[0]
+ return peers
+}
+
+func AsVolume() {
+ nowPeers := GetNowPeersList()
+ fmt.Println(nowPeers)
+}
+
+func AsMaster() {
+ nowPeers := GetNowPeersList()
+ coreThread := len(nowPeers) / 100
+ fmt.Println(coreThread)
+
+}
+
+func GetNowPeersList() []interface{} {
+
+ getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search"
+ getJson := `{
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "term": {
+ "application":"nodeOperation"
+ }
+ }
+ ]
+ }
+ },
+ "size": 1
+}`
+
+ buf, _ := util.EsReq("POST", getUrl, []byte(getJson))
+ source, _ := util.Sourcelist(buf)
+ //fmt.Println(source)
+ peers := source[0]["peers"].([]interface{})
+ return peers
+}
+
+func GetNewPeers() string {
+
+ getUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_search"
+ getJson := `{
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "term": {
+ "application":"nodeOperation"
+ }
+ }
+ ]
+ }
+ },
+ "size": 1
+}`
+
+ buf, _ := util.EsReq("POST", getUrl, []byte(getJson))
+ source, _ := util.Sourcelist(buf)
+ //fmt.Println(source)
+ peers := source[0]["peers"].([]interface{})
+ fmt.Println(peers)
+ p := "peers=" + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", ",", -1)
+ return p
+}
+
+func UpdatePeers(oldPeers string, newPeers string) {
+ str := "sed -ie 's/" + oldPeers + "/" + newPeers + "/g' /opt/vasystem/seaweedfs_start.sh"
+ util.RunScript(str)
+}
+
+func AddNewMasterToPeers() (result bool) {
+ peer := config.Server.EsServerIp + ":6333"
+ addUrl := "http://" + config.Server.EsServerIp + ":" + config.Server.EsServerPort + "/" + config.BasicFS.IndexName + "/_update_by_query"
+ addJson := `{
+ "script": {
+ "lang": "painless",
+ "inline": "ctx._source.peers.add(params.newpeer)",
+ "params": {
+ "newpeer": "` + peer + `"
+ }
+ },
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "term": {
+ "application": "nodeOperation"
+ }
+ }
+ ]
+ }
+ }
+}`
+ buf, _ := util.EsReq("POST", addUrl, []byte(addJson))
+ updateRes, _ := util.SourceUpdated(buf)
+ if updateRes == -1 {
+ result = false
+ } else {
+ result = true
+ }
+ return result
+}
diff --git a/go.mod b/go.mod
index e27256d..e899b4a 100644
--- a/go.mod
+++ b/go.mod
@@ -1,3 +1,8 @@
module test
go 1.12
+
+require (
+ github.com/gin-gonic/gin v1.6.1
+ github.com/spf13/viper v1.6.2
+)
diff --git a/main.go b/main.go
index 7499b8e..1ca5668 100644
--- a/main.go
+++ b/main.go
@@ -1,184 +1,16 @@
package main
import (
- "bytes"
- "encoding/json"
- "errors"
- "fmt"
- "io/ioutil"
- "net/http"
- "os/exec"
- "strings"
- "time"
+ "flag"
+ "test/config"
)
+
+var env = flag.String("pro", "pro", "read storage info")
+
+func init() {
+ config.Init(*env)
+}
func main() {
- oldPeers := GetOldPeers()
- fmt.Println("oldPeers: ", oldPeers)
- //AddNewMasterToPeers()
- newPeers := GetNewPeers()
- fmt.Println("newPeers: ", newPeers)
- UpdatePeers(oldPeers, newPeers)
- time.Sleep(time.Second * 3)
- nowPeers := GetOldPeers()
- fmt.Println("nowPeers: ", nowPeers)
-}
-
-func GetOldPeers() string {
- str := "cat /opt/vasystem/seaweedfs_start.sh | grep peers="
- peers := strings.Split(RunScript(str), "\n")[0]
- return peers
-}
-
-func GetNewPeers() string {
- getUrl := "http://192.168.20.10:9200/basicfs/_search"
- getJson := `{
- "query": {
- "bool": {
- "filter": [
- {
- "term": {
- "application":"nodeOperation"
- }
- }
- ]
- }
- },
- "size": 1
-}`
-
- buf, _ := EsReq("POST", getUrl, []byte(getJson))
- source, _ := Sourcelist(buf)
- //fmt.Println(source)
- peers := source[0]["peers"].([]interface{})
- fmt.Println(peers)
- p := "peers=" + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", ",", -1)
- return p
-}
-
-func UpdatePeers(oldPeers string, newPeers string) {
- str := "sed -ie 's/" + oldPeers + "/" + newPeers + "/g' /opt/vasystem/seaweedfs_start.sh"
- fmt.Println(str)
- RunScript(str)
-}
-
-//鑴氭湰灏佽
-func RunScript(str string) string {
-
- cmd := exec.Command("sh", "-c", str)
- var out bytes.Buffer
- cmd.Stdout = &out
- err := cmd.Run()
- if err != nil {
- return "杩愯澶辫触"
- }
- return out.String()
-}
-
-//瑙f瀽http
-func EsReq(method string, url string, parama []byte) (buf []byte, err error) {
- timeout := time.Duration(10 * time.Second)
- client := http.Client{
- Timeout: timeout,
- }
- request, err := http.NewRequest(method, url, bytes.NewBuffer(parama))
- request.Header.Set("Content-type", "application/json")
-
- if err != nil {
- fmt.Println("build request fail !")
- return nil, err
- }
-
- resp, err := client.Do(request)
- if err != nil {
- fmt.Println("request error: ", err)
- return nil, err
- }
-
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- fmt.Println(err)
- return nil, err
- }
- return body, nil
-}
-
-func Sourcelist(buf []byte) (sources []map[string]interface{}, err error) {
- var info interface{}
- json.Unmarshal(buf, &info)
- out, ok := info.(map[string]interface{})
- if !ok {
- return nil, errors.New("http response interface can not change map[string]interface{}")
- }
-
- middle, ok := out["hits"].(map[string]interface{})
- if !ok {
- return nil, errors.New("first hits change error!")
- }
- for _, in := range middle["hits"].([]interface{}) {
- tmpbuf, ok := in.(map[string]interface{})
- if !ok {
- fmt.Println("change to source error!")
- continue
- }
- source, ok := tmpbuf["_source"].(map[string]interface{})
- if !ok {
- fmt.Println("change _source error!")
- continue
- }
- sources = append(sources, source)
- }
- return sources, nil
-}
-
-func AddNewMasterToPeers() (result bool) {
-
- peer := "192.168.5.22:6333"
- addUrl := "http://192.168.20.10:9200/basicfs/_update_by_query"
- addJson := `{
- "script": {
- "lang": "painless",
- "inline": "ctx._source.peers.add(params.newpeer)",
- "params": {
- "newpeer": "` + peer + `"
- }
- },
- "query": {
- "bool": {
- "filter": [
- {
- "term": {
- "application": "nodeOperation"
- }
- }
- ]
- }
- }
-}`
- buf, _ := EsReq("POST", addUrl, []byte(addJson))
- updateRes, _ := SourceUpdated(buf)
- if updateRes == -1 {
- result = false
- } else {
- result = true
- }
- return result
-}
-
-func SourceUpdated(buf []byte) (total int, err error) {
- var info interface{}
- json.Unmarshal(buf, &info)
- out, ok := info.(map[string]interface{})
- if !ok {
- return -1, errors.New("http response interface can not change map[string]interface{}")
- }
-
- middle, ok := out["updated"].(float64)
- if !ok {
- return -1, errors.New("first total change error!")
- }
- total = int(middle)
- return total, nil
}
diff --git a/router/router.go b/router/router.go
new file mode 100644
index 0000000..b434b92
--- /dev/null
+++ b/router/router.go
@@ -0,0 +1,19 @@
+package router
+
+import (
+ "github.com/gin-gonic/gin"
+ "test/controllers"
+)
+
+func NewRouter() *gin.Context {
+ r := gin.Default()
+ swfsController := new(controllers.SeaweedfsController)
+
+ urlPrefix := "/node/api-v"
+ swfsApi := r.Group(urlPrefix + "swfs")
+ {
+ swfsApi.POST("/addNode", swfsController.AddSWFSNodeController)
+ swfsApi.POST("/updateSWFSService", swfsController.UpdateSWFSServiceController)
+ swfsApi.GET("/", swfsController.RestartMaster)
+ }
+}
diff --git a/service/service.go b/service/service.go
new file mode 100644
index 0000000..6d43c33
--- /dev/null
+++ b/service/service.go
@@ -0,0 +1 @@
+package service
diff --git a/util/util.go b/util/util.go
new file mode 100644
index 0000000..bf2337a
--- /dev/null
+++ b/util/util.go
@@ -0,0 +1,100 @@
+package util
+
+import (
+ "bytes"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "os/exec"
+ "time"
+)
+
+//鑴氭湰灏佽
+func RunScript(str string) string {
+
+ cmd := exec.Command("sh", "-c", str)
+ var out bytes.Buffer
+ cmd.Stdout = &out
+ err := cmd.Run()
+ if err != nil {
+ return "杩愯澶辫触"
+ }
+ return out.String()
+}
+
+//瑙f瀽http
+func EsReq(method string, url string, parama []byte) (buf []byte, err error) {
+ timeout := time.Duration(10 * time.Second)
+ client := http.Client{
+ Timeout: timeout,
+ }
+ request, err := http.NewRequest(method, url, bytes.NewBuffer(parama))
+ request.Header.Set("Content-type", "application/json")
+
+ if err != nil {
+ fmt.Println("build request fail !")
+ return nil, err
+ }
+
+ resp, err := client.Do(request)
+ if err != nil {
+ fmt.Println("request error: ", err)
+ return nil, err
+ }
+
+ defer resp.Body.Close()
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println(err)
+ return nil, err
+ }
+ return body, nil
+}
+
+//瑙f瀽json
+func Sourcelist(buf []byte) (sources []map[string]interface{}, err error) {
+ var info interface{}
+ json.Unmarshal(buf, &info)
+ out, ok := info.(map[string]interface{})
+ if !ok {
+ return nil, errors.New("http response interface can not change map[string]interface{}")
+ }
+
+ middle, ok := out["hits"].(map[string]interface{})
+ if !ok {
+ return nil, errors.New("first hits change error!")
+ }
+ for _, in := range middle["hits"].([]interface{}) {
+ tmpbuf, ok := in.(map[string]interface{})
+ if !ok {
+ fmt.Println("change to source error!")
+ continue
+ }
+ source, ok := tmpbuf["_source"].(map[string]interface{})
+ if !ok {
+ fmt.Println("change _source error!")
+ continue
+ }
+ sources = append(sources, source)
+ }
+ return sources, nil
+}
+
+//瑙f瀽鏇存柊
+func SourceUpdated(buf []byte) (total int, err error) {
+ var info interface{}
+ json.Unmarshal(buf, &info)
+ out, ok := info.(map[string]interface{})
+ if !ok {
+ return -1, errors.New("http response interface can not change map[string]interface{}")
+ }
+
+ middle, ok := out["updated"].(float64)
+ if !ok {
+ return -1, errors.New("first total change error!")
+ }
+ total = int(middle)
+ return total, nil
+}
--
Gitblit v1.8.0