sunty
2020-10-16 cecd74d72d3872da3e8a69658d493842a71315da
add clusterApi
1个文件已添加
258 ■■■■■ 已修改文件
ClusterApi.go 258 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ClusterApi.go
New file
@@ -0,0 +1,258 @@
package esutil
import (
    "bytes"
    "encoding/json"
    "errors"
    "fmt"
    "github.com/spf13/viper"
    "io/ioutil"
    "net/http"
    "os/exec"
    "strings"
    "time"
)
func InitYml(configPath string) (bool, error) {
    v := viper.New()
    v.SetConfigType("yml")
    v.SetConfigName("elasticsearch")
    v.AddConfigPath(configPath)
    err := v.ReadInConfig()
    if err != nil {
        return false, err
    }
    if err := v.WriteConfig(); err != nil {
        return false, err
    }
    return true, nil
}
func StartServer(binPath string, ip string) bool {
    resultMsg := CMDSC("sh " + binPath + "/elasticsearch -d")
    if resultMsg == "运行失败" {
        return false
    }
    bool := VerifyNodeServer(ip, "9200")
    return bool
}
func StopServer(binPath string) {
}
func UpdateNodeRole(configPath string, role string) (bool, error) {
    v := viper.New()
    v.SetConfigType("yml")
    v.SetConfigName("elasticsearch")
    v.AddConfigPath(configPath)
    err := v.ReadInConfig()
    if err != nil {
        return false, err
    }
    if role == "master" {
        v.Set("node.master", true)
    } else if role == "slave" {
        v.Set("node.master", false)
    }
    if err := v.WriteConfig(); err != nil {
        return false, err
    }
    return true,nil
}
func VerifyCreated(configPath string, ip string) (bool, error) {
    v := viper.New()
    v.SetConfigType("yml")
    v.SetConfigName("elasticsearch")
    v.AddConfigPath(configPath)
    err := v.ReadInConfig()
    if err != nil {
        return false, err
    }
    hosts := v.GetStringSlice("discovery.zen.ping.unicast.hosts")
    for _, pick := range hosts {
        if pick == ip {
            return true, errors.New("该节点已经被执行“创建集群”")
        }
    }
    if len(hosts) > 1 {
        return true, errors.New("该节点已经被执行“加入集群”")
    }
    return false, nil
}
func VerifyNodeServer(ip string, port string) bool {
    b := false
    url := "http://" + ip + ":" + port
    for i := 1; i < 3; i++ {
        _, err := HttpRC("GET", url, nil)
        if err != nil {
            b = false
            if i < 3 {
                time.Sleep(time.Second * 10)
                continue
            }
        } else {
            b = true
            break
        }
    }
    return b
}
func VerifyNodeRole(configPath string, ) (string, error) {
    role := "slave"
    v := viper.New()
    v.SetConfigType("yml")
    v.SetConfigName("elasticsearch")
    v.AddConfigPath(configPath)
    err := v.ReadInConfig()
    if err != nil {
        return role, err
    }
    flag := v.GetBool("node.master")
    if flag == true {
        role = "master"
    }
    return role, nil
}
func UpdateDiscoveryZenPingUnicastHosts(configPath string, oldIp string, newIp string) (bool, error) {
    v := viper.New()
    v.SetConfigType("yml")
    v.SetConfigName("elasticsearch")
    v.AddConfigPath(configPath)
    err := v.ReadInConfig()
    if err != nil {
        return false, err
    }
    flag := false
    hosts := v.GetStringSlice("discovery.zen.ping.unicast.hosts")
    for i, pick := range hosts {
        if pick == oldIp {
            hosts[i] = newIp
            flag = true
        }
    }
    if flag == false {
        fmt.Println("要修改的ip不在列表中")
    }
    v.Set("discovery.zen.ping.unicast.hosts", hosts)
    if err := v.WriteConfig(); err != nil {
        return false, err
    }
    return true, nil
}
func AddDiscoveryZenPingUnicastHosts(configPath string, ip string) []string {
    v := viper.New()
    v.SetConfigType("yml")
    v.SetConfigName("elasticsearch")
    v.AddConfigPath(configPath)
    err := v.ReadInConfig()
    if err != nil {
    }
    hosts := v.GetStringSlice("discovery.zen.ping.unicast.hosts")
    hosts = append(hosts, ip)
    return hosts
}
type NodeInfo struct {
    NodeIp     string `json:"nodeIp"`
    NodeRole   string `json:"nodeRole"`
    NodeMaster bool   `json:"nodeMaster"`
    NodeName   string `json:"nodeName"`
}
func GetClusterInfo(ip string, port string) ([]NodeInfo, error) {
    url := "http://" + ip + ":" + port + "/_cat/nodes?v"
    buf, err := HttpRC("GET", url, nil)
    if err != nil {
        return nil, err
    }
    var inf = []NodeInfo{}
    res := strings.Split(string(buf), "\n")[1:]
    for _, r := range res {
        if r != "" {
            inx := strings.Fields(r)
            ip := inx[0]
            role := "slave"
            if find := strings.Contains(inx[7], "m"); find {
                role = "master"
            }
            master := false
            if find := strings.Contains(inx[8], "*"); find {
                master = true
            }
            name := inx[9]
            inf = append(inf, NodeInfo{
                NodeName:   name,
                NodeRole:   role,
                NodeIp:     ip,
                NodeMaster: master,
            })
        }
    }
    return inf, nil
}
func ExcludeNode(ip string, port string) (bool, error) {
    flag := false
    url := "http://" + ip + ":" + port + "/_cluster/settings"
    body := `{
                "transient":{
                    "cluster.routing.allocation.exclude._ip":"` + ip + `"
                }
            }`
    buf, err := HttpRC("PUT", url, []byte(body))
    if err != nil {
        return false, err
    }
    var inf map[string]interface{}
    json.Unmarshal(buf, &inf)
    if inf["acknowledged"] != nil {
        flag = inf["acknowledged"].(bool)
    }
    return flag, nil
}
func CMDSC(scriptStr string) string {
    cmd := exec.Command("sh", "-c", scriptStr)
    var out bytes.Buffer
    cmd.Stdout = &out
    err := cmd.Run()
    if err != nil {
        return "运行失败"
    }
    return out.String()
}
func HttpRC(method string, url string, parama []byte) (buf []byte, err error) {
    timeout := time.Duration(10 * time.Second)
    client := http.Client{
        Timeout: timeout,
    }
    request, err := http.NewRequest(method, url, bytes.NewBuffer(parama))
    request.Header.Set("Content-type", "application/json")
    if err != nil {
        fmt.Println("build request fail !")
        return nil, err
    }
    resp, err := client.Do(request)
    if err != nil {
        fmt.Println("request error: ", err)
        return nil, err
    }
    defer resp.Body.Close()
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        fmt.Println(err)
        return nil, err
    }
    return body, nil
}