sunty
2020-04-16 53eceb2d556c4562e21a6ad8ee4114038b92d6fc
main.go
@@ -1,184 +1,18 @@
package main
import (
   "bytes"
   "encoding/json"
   "errors"
   "fmt"
   "io/ioutil"
   "net/http"
   "os/exec"
   "strings"
   "time"
   "flag"
   "swfs/config"
   "swfs/router"
)
var env = flag.String("pro", "pro", "read storage info")
func init() {
   config.Init(*env)
}
func main() {
   oldPeers := GetOldPeers()
   fmt.Println("oldPeers: ", oldPeers)
   //AddNewMasterToPeers()
   newPeers := GetNewPeers()
   fmt.Println("newPeers: ", newPeers)
   UpdatePeers(oldPeers, newPeers)
   time.Sleep(time.Second * 3)
   nowPeers := GetOldPeers()
   fmt.Println("nowPeers: ", nowPeers)
}
func GetOldPeers() string {
   str := "cat /opt/vasystem/seaweedfs_start.sh | grep peers="
   peers := strings.Split(RunScript(str), "\n")[0]
   return peers
}
func GetNewPeers() string {
   getUrl := "http://192.168.20.10:9200/basicfs/_search"
   getJson := `{
    "query": {
        "bool": {
            "filter": [
                {
                    "term": {
                  "application":"nodeOperation"
               }
                }
            ]
        }
    },
    "size": 1
}`
   buf, _ := EsReq("POST", getUrl, []byte(getJson))
   source, _ := Sourcelist(buf)
   //fmt.Println(source)
   peers := source[0]["peers"].([]interface{})
   fmt.Println(peers)
   p := "peers=" + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", ",", -1)
   return p
}
func UpdatePeers(oldPeers string, newPeers string) {
   str := "sed -ie 's/" + oldPeers + "/" + newPeers + "/g' /opt/vasystem/seaweedfs_start.sh"
   fmt.Println(str)
   RunScript(str)
}
//脚本封装
func RunScript(str string) string {
   cmd := exec.Command("sh", "-c", str)
   var out bytes.Buffer
   cmd.Stdout = &out
   err := cmd.Run()
   if err != nil {
      return "运行失败"
   }
   return out.String()
}
//解析http
func EsReq(method string, url string, parama []byte) (buf []byte, err error) {
   timeout := time.Duration(10 * time.Second)
   client := http.Client{
      Timeout: timeout,
   }
   request, err := http.NewRequest(method, url, bytes.NewBuffer(parama))
   request.Header.Set("Content-type", "application/json")
   if err != nil {
      fmt.Println("build request fail !")
      return nil, err
   }
   resp, err := client.Do(request)
   if err != nil {
      fmt.Println("request error: ", err)
      return nil, err
   }
   defer resp.Body.Close()
   body, err := ioutil.ReadAll(resp.Body)
   if err != nil {
      fmt.Println(err)
      return nil, err
   }
   return body, nil
}
func Sourcelist(buf []byte) (sources []map[string]interface{}, err error) {
   var info interface{}
   json.Unmarshal(buf, &info)
   out, ok := info.(map[string]interface{})
   if !ok {
      return nil, errors.New("http response interface can not change map[string]interface{}")
   }
   middle, ok := out["hits"].(map[string]interface{})
   if !ok {
      return nil, errors.New("first hits change error!")
   }
   for _, in := range middle["hits"].([]interface{}) {
      tmpbuf, ok := in.(map[string]interface{})
      if !ok {
         fmt.Println("change to source error!")
         continue
      }
      source, ok := tmpbuf["_source"].(map[string]interface{})
      if !ok {
         fmt.Println("change _source error!")
         continue
      }
      sources = append(sources, source)
   }
   return sources, nil
}
func AddNewMasterToPeers() (result bool) {
   peer := "192.168.5.22:6333"
   addUrl := "http://192.168.20.10:9200/basicfs/_update_by_query"
   addJson := `{
    "script": {
        "lang": "painless",
        "inline": "ctx._source.peers.add(params.newpeer)",
        "params": {
            "newpeer": "` + peer + `"
        }
    },
    "query": {
        "bool": {
            "filter": [
                {
                    "term": {
                        "application": "nodeOperation"
                    }
                }
            ]
        }
    }
}`
   buf, _ := EsReq("POST", addUrl, []byte(addJson))
   updateRes, _ := SourceUpdated(buf)
   if updateRes == -1 {
      result = false
   } else {
      result = true
   }
   return result
}
func SourceUpdated(buf []byte) (total int, err error) {
   var info interface{}
   json.Unmarshal(buf, &info)
   out, ok := info.(map[string]interface{})
   if !ok {
      return -1, errors.New("http response interface can not change map[string]interface{}")
   }
   middle, ok := out["updated"].(float64)
   if !ok {
      return -1, errors.New("first total change error!")
   }
   total = int(middle)
   return total, nil
   r := router.NewRouter()
   r.Run("0.0.0.0:7020")
}