package service import ( "bytes" "errors" "fmt" "io/ioutil" "net/http" "strings" "time" "vamicro/config" "vamicro/system-service/models" "vamicro/system-service/sys" "vamicro/system-service/util" "basic.com/pubsub/esutil.git" "basic.com/valib/logger.git" ) func InitEsNode(dataPath []string, configPath string) (bool, error) { for _, dp := range dataPath { path := strings.Split(dp, "/") if strings.Contains(path[len(path)-1], "elasticsearch") == false && strings.Contains(path[len(path)-2], "elasticsearch") == false { return false, errors.New(dp + "路径不正确") } util.CMDSC("rm -rf " + dp + "/data " + dp + "/logs") } _, err := esutil.SetDiscoveryZenPingUnicastHosts(configPath, []string{}) if err != nil { return false, err } return true, nil } //创建集群(初创节点,初始节点,一个集群该功能只可运行一次) //configPath: es的配置文件路径,目前默认为/opt/elasticsearch/config //binPath: es的脚本路径,目前默认为 /opt/vasystem/script //indexPath: 目前默认为 /opt/vasystem/indexInit func CreateOriginalCluster(configPath string, binPath string, indexPath string) (bool, error) { verIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) verPort := "9200" sp := esutil.StopServer(binPath, "0.0.0.0", "9200") if sp == true { logger.Debug("binPath路径错误") return false, errors.New("binPath路径错误") } _, errC := esutil.VerifyCreated(configPath) if errC != nil { logger.Debug("errC:", errC) return false, errC } _, errI := esutil.InitYml(configPath) if errI != nil { logger.Debug("errI:", errI) return false, errI } _, errR := esutil.UpdateNodeRole(configPath, "master") if errR != nil { logger.Debug("errR:", errR) return false, errR } clusterName := GetClusterNameByVirtualIp(verIp) logger.Debugf("CreateCluster clusterName=%v, verIp=%v", clusterName, verIp) _, errCN := esutil.UpdateClusterName(configPath, clusterName) if errCN != nil { logger.Debug("errCN:", errCN) return false, errCN } _, errS := esutil.SetDiscoveryZenPingUnicastHosts(configPath, []string{verIp}) if errS != nil { logger.Debug("errS:", errS) return false, errS } st := esutil.StartServer(binPath, verIp, verPort) if st == false { logger.Debug("binPath路径错误") return false, errors.New("binPath路径错误") } result := esutil.VerifyNodeServer(verIp, verPort, 20) if result == false { logger.Debug("服务启动超时") return false, errors.New("服务启动超时") } rIndexInit := esutil.InitIndex(indexPath) if rIndexInit == false { logger.Debug("索引初始化失败") return false, errors.New("索引初始化失败") } return true, nil } func GetClusterNameByVirtualIp(ip string) string { var clusterE = models.Cluster{ VirtualIp: ip, } clusterInfo, err := clusterE.FindByVirtualIp() if err != nil { logger.Debugf("GetClusterName FindByVirtualIp err=%v", err) return clusterInfo.ClusterName } return clusterInfo.ClusterName } //加入集群(该节点将根据计算得出自己需要扮演的角色) dataPath为es的数据路径,默认在/data/disk1/elasticsearch func AddCluster(configPath string, binPath string, ip string, port string, dataPath []string) (bool, error) { var errRet error start := time.Now() defer func(errRet error) { logger.Debugf("AddCluster finish time=%v, err=%v", time.Since(start), errRet) }(errRet) _, errRet = InitEsNode(dataPath, configPath) if errRet != nil { return false, errors.New("elasticsearch 初始化失败") } verIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) verPort := "9200" localIp := verIp localPort := verPort sp := esutil.StopServer(binPath, localIp, localPort) if sp == true { logger.Debug("AddCluster binPath路径错误") return false, errors.New("binPath路径错误") } LocalRole := "slave" var nodesInfo []esutil.NodeInfo nodesInfo, errRet = esutil.GetClusterInfo(ip, port) if errRet != nil { logger.Debug("errG:", errRet) return false, errRet } TotalAllNodes := len(nodesInfo) if TotalAllNodes < 1 { logger.Debug("目标集群不存在") return false, errors.New("目标集群不存在") } TotalMasterNodes := 0 hosts := make([]string, 0) allHosts := make([]string, 0) for _, mp := range nodesInfo { if mp.NodeRole == "master" { TotalMasterNodes = TotalMasterNodes + 1 hosts = append(hosts, mp.NodeIp) } allHosts = append(allHosts, mp.NodeIp) } if TotalAllNodes/5 > TotalMasterNodes { LocalRole = "master" hosts = append(hosts, localIp) for _, mpk := range allHosts { url := "http://" + mpk + ":8888/data/api-v/es/node/synchronizeHosts" body := `{ "binPath":"` + binPath + `", "configPath":"` + configPath + `", "hosts":["` + strings.Replace(strings.Trim(fmt.Sprint(hosts), "[]"), " ", "\",\"", -1) + `"] }` _, errRet := HttpRCT("POST", url, []byte(body), 10*time.Second) logger.Debugf("AddCluster url=%v, mpk=%v, errRet=%v", url, mpk, errRet) if errRet != nil { return false, errors.New(mpk + "节点同步失败") } } } _, errRet = esutil.UpdateNodeRole(configPath, LocalRole) if errRet != nil { logger.Debug("AddCluster errNR:", errRet) return false, errRet } clusterName := GetClusterNameByVirtualIp(ip) logger.Debugf("AddCluster clusterName=%v, ip=%v", clusterName, ip) _, errRet = esutil.UpdateClusterName(configPath, clusterName) if errRet != nil { logger.Debug("AddCluster errCN:", errRet) return false, errRet } _, errRet = esutil.SetDiscoveryZenPingUnicastHosts(configPath, hosts) if errRet != nil { logger.Debug("AddCluster errS:", errRet) return false, errRet } st := esutil.StartServer(binPath, localIp, localPort) if st == false { logger.Debug("AddCluster 服务启动超时") return false, errors.New("服务启动超时") } return true, errRet } //退出集群(从当前集群退出) func ExitCluster(configPath string, binPath string) (bool, error) { var errRet error start := time.Now() defer func(errRet error) { logger.Debugf("Leave ExitCluster finish time=%v, err=%v", time.Since(start), errRet) }(errRet) verIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) verPort := "9200" localIp := verIp localPort := verPort var hosts []string if hosts, errRet = esutil.GetDiscoveryZenPingUnicastHosts(configPath); errRet != nil { logger.Debug("Leave errH:", errRet) return false, errRet } for _, p := range hosts { if p == localIp { logger.Debug("Leave 主节点不允许退出集群") return false, errors.New("主节点不允许退出集群") } } var bEN bool if bEN, errRet = esutil.ExcludeNode(localIp, localPort); errRet != nil { logger.Debug("Leave errEN:", errRet) return false, errRet } sp := esutil.StopServer(binPath, localIp, localPort) if sp == true { logger.Debug("Leave binPath路径错误") return false, errors.New("binPath路径错误") } if _, errRet = esutil.SetDiscoveryZenPingUnicastHosts(configPath, nil); errRet != nil { logger.Debug("Leave errS:", errRet) return false, errRet } return bEN, nil } func HttpRCT(method string, url string, parama []byte, timeout time.Duration) (buf []byte, err error) { client := http.Client{ Timeout: timeout, } logger.Debugf("HttpRCT method=%v, url=%v, parama=%v", method, url, string(parama)) request, err := http.NewRequest(method, url, bytes.NewBuffer(parama)) request.Header.Set("Content-type", "application/json") request.Header.Add("Authorization", "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjQ3ND"+ "UwMjU5MjMsInVzZXIiOiJ7XCJpZFwiOlwiZTZjY2QzNmQtNGYxNi00NmZjLTg4ZDUtMDczNjU4NjZkMjA1XCIsXCJwZXJtaXNzaW"+ "9uc1wiOltcInByb2R1Y3RNYW5nZTpwdWJsaXNoXCIsXCJjb2RlTWFuZ2U6dmlld1wiLFwiZGV2aWNlTWFuYWdlOmFkZFwiLFwiYW"+ "RtaW5NYW5hZ2VcIixcIm9yZGVyTWFuZ2VcIixcImRldmljZU1hbmFnZTp2aWV3XCIsXCJwcm9kdWN0TWFuZ2U6YWRkXCIsXCJhZG"+ "1pbk1hbmFnZTp2aWV3XCIsXCJjb2RlTWFuZ2U6YWRkXCIsXCJwcm9kdWN0TWFuZ2U6b2ZmU2FsZVwiLFwib3JkZXJNYW5nZTpjYW"+ "5jZWxcIixcInByb2R1Y3RDZW50ZXI6ZG93bmxvYWRcIixcInByb2R1Y3RDZW50ZXI6YnV5XCIsXCJwcm9kdWN0TWFuZ2U6dmlld1"+ "wiLFwiYXBpXCIsXCJob21lXCIsXCJvcmRlck1hbmdlOnBheVwiLFwiYWRtaW5NYW5hZ2U6YWRkXCIsXCJvcmRlck1hbmdlOmRvd2"+ "5sb2FkXCIsXCJwcm9kdWN0Q2VudGVyXCIsXCJkZXZpY2VNYW5hZ2U6dW5iaW5kXCIsXCJvcmRlck1hbmdlOnZpZXdcIixcImFkbW"+ "luTWFuYWdlOmVkaXRcIixcImRldmljZU1hbmFnZVwiLFwidmlwTWFuYWdlOmFkZFwiLFwidmlwTWFuYWdlOnZpZXdcIixcInByb2"+ "R1Y3RDZW50ZXI6dmlld1wiLFwidmlwTWFuYWdlOmVkaXRcIixcInZpcE1hbmFnZVwiLFwicHJvZHVjdE1hbmdlOmVkaXRcIixcIm"+ "NvZGVNYW5nZVwiLFwicHJvZHVjdE1hbmdlXCJdLFwidXNlcm5hbWVcIjpcImJhc2ljXCJ9In0.vwjAFkWuEyadRLvIOGK8LFE3Mj"+ "pY3SQ7j6AlTXnQDG8") if err != nil { logger.Debug("build request err:", err) return nil, err } resp, err := client.Do(request) if err != nil { logger.Debug("request error: ", err) return nil, err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { logger.Debug("read body err:", err) return nil, err } return body, nil }