package esutil
|
|
import (
|
"bytes"
|
"encoding/json"
|
"errors"
|
"fmt"
|
"io/ioutil"
|
"net/http"
|
"os/exec"
|
"strings"
|
"time"
|
|
"github.com/spf13/viper"
|
)
|
|
//初始化配置文件
|
func InitYml(configPath string) (bool, error) {
|
v := viper.New()
|
v.SetConfigType("yml")
|
v.SetConfigName("elasticsearch")
|
v.AddConfigPath(configPath)
|
err := v.ReadInConfig()
|
if err != nil {
|
return false, err
|
}
|
if err := v.WriteConfig(); err != nil {
|
return false, err
|
}
|
return true, nil
|
}
|
|
//初始化索引
|
func InitIndex(indexPath string) bool {
|
resultMsg := CMDSC("sh " + indexPath + "/indexInit.sh")
|
if resultMsg == "运行失败" {
|
return false
|
}
|
return true
|
}
|
|
//启动服务
|
func StartServer(binPath string, ip string, port string) bool {
|
resultMsg := CMDSC("sh " + binPath + "/es_start.sh")
|
if resultMsg == "运行失败" {
|
return false
|
}
|
return true
|
}
|
|
//关闭服务
|
func StopServer(binPath string, ip string, port string) bool {
|
b := VerifyShortNodeServer(ip, port)
|
if b == true {
|
resultMsg := CMDSC("sh " + binPath + "/es_stop.sh")
|
if resultMsg == "运行失败" {
|
return true
|
}
|
time.Sleep(time.Second * 3)
|
}
|
bs := VerifyShortNodeServer(ip, port)
|
return bs
|
}
|
|
//更换节点角色
|
func UpdateNodeRole(configPath string, role string) (bool, error) {
|
v := viper.New()
|
v.SetConfigType("yml")
|
v.SetConfigName("elasticsearch")
|
v.AddConfigPath(configPath)
|
err := v.ReadInConfig()
|
if err != nil {
|
return false, err
|
}
|
if role == "master" {
|
v.Set("node.master", true)
|
} else if role == "slave" {
|
v.Set("node.master", false)
|
}
|
if err := v.WriteConfig(); err != nil {
|
return false, err
|
}
|
return true, nil
|
}
|
|
//更换节点集群名称
|
func UpdateClusterName(configPath string, name string) (bool, error) {
|
v := viper.New()
|
v.SetConfigType("yml")
|
v.SetConfigName("elasticsearch")
|
v.AddConfigPath(configPath)
|
err := v.ReadInConfig()
|
if err != nil {
|
return false, err
|
}
|
v.Set("cluster.name", name)
|
if err := v.WriteConfig(); err != nil {
|
return false, err
|
}
|
return true, nil
|
}
|
|
//验证该节点是否被使用过
|
func VerifyCreated(configPath string) (bool, error) {
|
v := viper.New()
|
v.SetConfigType("yml")
|
v.SetConfigName("elasticsearch")
|
v.AddConfigPath(configPath)
|
err := v.ReadInConfig()
|
if err != nil {
|
return true, err
|
}
|
hosts := v.GetStringSlice("discovery.zen.ping.unicast.hosts")
|
if len(hosts) > 1 {
|
return true, errors.New("该节点已经被使用")
|
}
|
return false, nil
|
}
|
|
//验证节点服务是否正常启动
|
func VerifyNodeServer(ip string, port string, waitTime int) bool {
|
b := false
|
url := "http://" + ip + ":" + port
|
for i := 1; i < 3; i++ {
|
_, err := HttpRC("GET", url, nil)
|
if err != nil {
|
b = false
|
if i < 3 {
|
time.Sleep(time.Second * time.Duration(waitTime))
|
continue
|
}
|
} else {
|
b = true
|
break
|
}
|
}
|
return b
|
}
|
|
//验证节点服务是否正常启动(短验证)
|
func VerifyShortNodeServer(ip string, port string) bool {
|
url := "http://" + ip + ":" + port
|
_, err := HttpRC("GET", url, nil)
|
if err != nil {
|
return false
|
}
|
return true
|
}
|
|
//验证节点角色
|
func VerifyNodeRole(configPath string) (string, error) {
|
role := "slave"
|
v := viper.New()
|
v.SetConfigType("yml")
|
v.SetConfigName("elasticsearch")
|
v.AddConfigPath(configPath)
|
err := v.ReadInConfig()
|
if err != nil {
|
return role, err
|
}
|
flag := v.GetBool("node.master")
|
if flag == true {
|
role = "master"
|
}
|
return role, nil
|
}
|
|
//查询组播列表
|
func GetDiscoveryZenPingUnicastHosts(configPath string) ([]string, error) {
|
v := viper.New()
|
v.SetConfigType("yml")
|
v.SetConfigName("elasticsearch")
|
v.AddConfigPath(configPath)
|
err := v.ReadInConfig()
|
if err != nil {
|
return nil, err
|
}
|
hosts := v.GetStringSlice("discovery.zen.ping.unicast.hosts")
|
return hosts, nil
|
}
|
|
//设置组播列表
|
func SetDiscoveryZenPingUnicastHosts(configPath string, hosts []string) (bool, error) {
|
v := viper.New()
|
v.SetConfigType("yml")
|
v.SetConfigName("elasticsearch")
|
v.AddConfigPath(configPath)
|
err := v.ReadInConfig()
|
if err != nil {
|
return false, err
|
}
|
v.Set("discovery.zen.ping.unicast.hosts", hosts)
|
if err := v.WriteConfig(); err != nil {
|
return false, err
|
}
|
return true, nil
|
}
|
|
//更新组播列表
|
func UpdateDiscoveryZenPingUnicastHosts(configPath string, oldIp string, newIp string) (bool, error) {
|
v := viper.New()
|
v.SetConfigType("yml")
|
v.SetConfigName("elasticsearch")
|
v.AddConfigPath(configPath)
|
err := v.ReadInConfig()
|
if err != nil {
|
return false, err
|
}
|
flag := false
|
hosts := v.GetStringSlice("discovery.zen.ping.unicast.hosts")
|
for i, pick := range hosts {
|
if pick == oldIp {
|
hosts[i] = newIp
|
flag = true
|
}
|
}
|
if flag == false {
|
fmt.Println("要修改的ip不在列表中")
|
}
|
v.Set("discovery.zen.ping.unicast.hosts", hosts)
|
if err := v.WriteConfig(); err != nil {
|
return false, err
|
}
|
return true, nil
|
}
|
|
//添加组播成员
|
func AddDiscoveryZenPingUnicastHosts(configPath string, ip string) (bool, error) {
|
v := viper.New()
|
v.SetConfigType("yml")
|
v.SetConfigName("elasticsearch")
|
v.AddConfigPath(configPath)
|
errR := v.ReadInConfig()
|
if errR != nil {
|
return false, errR
|
}
|
hosts := v.GetStringSlice("discovery.zen.ping.unicast.hosts")
|
hosts = append(hosts, ip)
|
v.Set("discovery.zen.ping.unicast.hosts", hosts)
|
if errW := v.WriteConfig(); errW != nil {
|
return false, errW
|
}
|
return true, nil
|
}
|
|
type NodeInfo struct {
|
NodeIp string `json:"nodeIp"`
|
NodeRole string `json:"nodeRole"`
|
NodeMaster bool `json:"nodeMaster"`
|
NodeName string `json:"nodeName"`
|
}
|
|
//查询集群信息
|
func GetClusterInfo(ip string, port string) ([]NodeInfo, error) {
|
url := "http://" + ip + ":" + port + "/_cat/nodes?v"
|
buf, err := HttpRC("GET", url, nil)
|
if err != nil {
|
return nil, err
|
}
|
var inf = []NodeInfo{}
|
res := strings.Split(string(buf), "\n")[1:]
|
for _, r := range res {
|
if r != "" {
|
inx := strings.Fields(r)
|
ip := inx[0]
|
role := "slave"
|
if find := strings.Contains(inx[7], "m"); find {
|
role = "master"
|
}
|
master := false
|
if find := strings.Contains(inx[8], "*"); find {
|
master = true
|
}
|
name := inx[9]
|
inf = append(inf, NodeInfo{
|
NodeName: name,
|
NodeRole: role,
|
NodeIp: ip,
|
NodeMaster: master,
|
})
|
}
|
}
|
return inf, nil
|
}
|
|
//排除即将退出集群的节点
|
func ExcludeNode(ip string, port string) (bool, error) {
|
flag := false
|
url := "http://" + ip + ":" + port + "/_cluster/settings"
|
body := `{
|
"transient":{
|
"cluster.routing.allocation.exclude._ip":"` + ip + `"
|
}
|
}`
|
buf, err := HttpRC("PUT", url, []byte(body))
|
if err != nil {
|
return false, err
|
}
|
var inf map[string]interface{}
|
json.Unmarshal(buf, &inf)
|
if inf["acknowledged"] != nil {
|
flag = inf["acknowledged"].(bool)
|
}
|
return flag, nil
|
}
|
|
func CMDSC(scriptStr string) string {
|
cmd := exec.Command("sh", "-c", scriptStr)
|
var out bytes.Buffer
|
cmd.Stdout = &out
|
err := cmd.Run()
|
if err != nil {
|
return "运行失败"
|
}
|
return out.String()
|
}
|
|
func HttpRC(method string, url string, parama []byte) (buf []byte, err error) {
|
timeout := time.Duration(10 * time.Second)
|
client := http.Client{
|
Timeout: timeout,
|
}
|
request, err := http.NewRequest(method, url, bytes.NewBuffer(parama))
|
request.Header.Set("Content-type", "application/json")
|
if err != nil {
|
fmt.Println("build request fail !")
|
return nil, err
|
}
|
|
resp, err := client.Do(request)
|
if err != nil {
|
fmt.Println("request error: ", err)
|
return nil, err
|
}
|
|
defer resp.Body.Close()
|
body, err := ioutil.ReadAll(resp.Body)
|
if err != nil {
|
fmt.Println(err)
|
return nil, err
|
}
|
return body, nil
|
}
|