package esutil import ( "bytes" "encoding/json" "errors" "fmt" "github.com/spf13/viper" "io/ioutil" "net/http" "os/exec" "strings" "time" ) //初始化配置文件 func InitYml(configPath string) (bool, error) { v := viper.New() v.SetConfigType("yml") v.SetConfigName("elasticsearch") v.AddConfigPath(configPath) err := v.ReadInConfig() if err != nil { return false, err } if err := v.WriteConfig(); err != nil { return false, err } return true, nil } //初始化索引 func InitIndex(indexPath string) (bool) { resultMsg := CMDSC("sh " + indexPath + "/indexInit.sh") if resultMsg == "运行失败" { return false } return true } //启动服务 func StartServer(binPath string, ip string, port string) bool { resultMsg := CMDSC("sh " + binPath + "/es_start.sh") if resultMsg == "运行失败" { return false } return true } //关闭服务 func StopServer(binPath string, ip string, port string) bool { b := VerifyShortNodeServer(ip, port) if b == true { resultMsg := CMDSC("sh " + binPath + "/es_stop.sh") if resultMsg == "运行失败" { return true } time.Sleep(time.Second * 3) } bs := VerifyShortNodeServer(ip, port) return bs } //更换节点角色 func UpdateNodeRole(configPath string, role string) (bool, error) { v := viper.New() v.SetConfigType("yml") v.SetConfigName("elasticsearch") v.AddConfigPath(configPath) err := v.ReadInConfig() if err != nil { return false, err } if role == "master" { v.Set("node.master", true) } else if role == "slave" { v.Set("node.master", false) } if err := v.WriteConfig(); err != nil { return false, err } return true, nil } //验证该节点是否被使用过 func VerifyCreated(configPath string) (bool, error) { v := viper.New() v.SetConfigType("yml") v.SetConfigName("elasticsearch") v.AddConfigPath(configPath) err := v.ReadInConfig() if err != nil { return true, err } hosts := v.GetStringSlice("discovery.zen.ping.unicast.hosts") if len(hosts) > 1 { return true, errors.New("该节点已经被使用") } return false, nil } //验证节点服务是否正常启动 func VerifyNodeServer(ip string, port string, waitTime int) bool { b := false url := "http://" + ip + ":" + port for i := 1; i < 3; i++ { _, err := HttpRC("GET", url, nil) if err != nil { b = false if i < 3 { time.Sleep(time.Second * time.Duration(waitTime)) continue } } else { b = true break } } return b } //验证节点服务是否正常启动(短验证) func VerifyShortNodeServer(ip string, port string) bool { url := "http://" + ip + ":" + port _, err := HttpRC("GET", url, nil) if err != nil { return false } return true } //验证节点角色 func VerifyNodeRole(configPath string, ) (string, error) { role := "slave" v := viper.New() v.SetConfigType("yml") v.SetConfigName("elasticsearch") v.AddConfigPath(configPath) err := v.ReadInConfig() if err != nil { return role, err } flag := v.GetBool("node.master") if flag == true { role = "master" } return role, nil } //查询组播列表 func GetDiscoveryZenPingUnicastHosts(configPath string) ([]string, error) { v := viper.New() v.SetConfigType("yml") v.SetConfigName("elasticsearch") v.AddConfigPath(configPath) err := v.ReadInConfig() if err != nil { return nil, err } hosts := v.GetStringSlice("discovery.zen.ping.unicast.hosts") return hosts, nil } //设置组播列表 func SetDiscoveryZenPingUnicastHosts(configPath string, hosts []string) (bool, error) { v := viper.New() v.SetConfigType("yml") v.SetConfigName("elasticsearch") v.AddConfigPath(configPath) err := v.ReadInConfig() if err != nil { return false, err } v.Set("discovery.zen.ping.unicast.hosts", hosts) if err := v.WriteConfig(); err != nil { return false, err } return true, nil } //更新组播列表 func UpdateDiscoveryZenPingUnicastHosts(configPath string, oldIp string, newIp string) (bool, error) { v := viper.New() v.SetConfigType("yml") v.SetConfigName("elasticsearch") v.AddConfigPath(configPath) err := v.ReadInConfig() if err != nil { return false, err } flag := false hosts := v.GetStringSlice("discovery.zen.ping.unicast.hosts") for i, pick := range hosts { if pick == oldIp { hosts[i] = newIp flag = true } } if flag == false { fmt.Println("要修改的ip不在列表中") } v.Set("discovery.zen.ping.unicast.hosts", hosts) if err := v.WriteConfig(); err != nil { return false, err } return true, nil } //添加组播成员 func AddDiscoveryZenPingUnicastHosts(configPath string, ip string) (bool, error) { v := viper.New() v.SetConfigType("yml") v.SetConfigName("elasticsearch") v.AddConfigPath(configPath) errR := v.ReadInConfig() if errR != nil { return false, errR } hosts := v.GetStringSlice("discovery.zen.ping.unicast.hosts") hosts = append(hosts, ip) v.Set("discovery.zen.ping.unicast.hosts", hosts) if errW := v.WriteConfig(); errW != nil { return false, errW } return true, nil } type NodeInfo struct { NodeIp string `json:"nodeIp"` NodeRole string `json:"nodeRole"` NodeMaster bool `json:"nodeMaster"` NodeName string `json:"nodeName"` } //查询集群信息 func GetClusterInfo(ip string, port string) ([]NodeInfo, error) { url := "http://" + ip + ":" + port + "/_cat/nodes?v" buf, err := HttpRC("GET", url, nil) if err != nil { return nil, err } var inf = []NodeInfo{} res := strings.Split(string(buf), "\n")[1:] for _, r := range res { if r != "" { inx := strings.Fields(r) ip := inx[0] role := "slave" if find := strings.Contains(inx[7], "m"); find { role = "master" } master := false if find := strings.Contains(inx[8], "*"); find { master = true } name := inx[9] inf = append(inf, NodeInfo{ NodeName: name, NodeRole: role, NodeIp: ip, NodeMaster: master, }) } } return inf, nil } //排除即将退出集群的节点 func ExcludeNode(ip string, port string) (bool, error) { flag := false url := "http://" + ip + ":" + port + "/_cluster/settings" body := `{ "transient":{ "cluster.routing.allocation.exclude._ip":"` + ip + `" } }` buf, err := HttpRC("PUT", url, []byte(body)) if err != nil { return false, err } var inf map[string]interface{} json.Unmarshal(buf, &inf) if inf["acknowledged"] != nil { flag = inf["acknowledged"].(bool) } return flag, nil } func CMDSC(scriptStr string) string { cmd := exec.Command("sh", "-c", scriptStr) var out bytes.Buffer cmd.Stdout = &out err := cmd.Run() if err != nil { return "运行失败" } return out.String() } func HttpRC(method string, url string, parama []byte) (buf []byte, err error) { timeout := time.Duration(10 * time.Second) client := http.Client{ Timeout: timeout, } request, err := http.NewRequest(method, url, bytes.NewBuffer(parama)) request.Header.Set("Content-type", "application/json") if err != nil { fmt.Println("build request fail !") return nil, err } resp, err := client.Do(request) if err != nil { fmt.Println("request error: ", err) return nil, err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { fmt.Println(err) return nil, err } return body, nil }