zhangzengfei
2023-09-05 63645d248c765244488cd34dbc1bb6528ca6b7c7
system-service/service/esClusterService.go
@@ -1,274 +1,274 @@
package service
import (
   "bytes"
   "errors"
   "fmt"
   "io/ioutil"
   "net/http"
   "strings"
   "time"
   "vamicro/config"
   "vamicro/system-service/models"
   "vamicro/system-service/sys"
   "vamicro/system-service/util"
   "basic.com/pubsub/esutil.git"
   "basic.com/valib/logger.git"
)
func InitEsNode(dataPath []string, configPath string) (bool, error) {
   for _, dp := range dataPath {
      path := strings.Split(dp, "/")
      if strings.Contains(path[len(path)-1], "elasticsearch") == false && strings.Contains(path[len(path)-2], "elasticsearch") == false {
         return false, errors.New(dp + "路径不正确")
      }
      util.CMDSC("rm -rf " + dp + "/data " + dp + "/logs")
   }
   _, err := esutil.SetDiscoveryZenPingUnicastHosts(configPath, []string{})
   if err != nil {
      return false, err
   }
   return true, nil
}
//创建集群(初创节点,初始节点,一个集群该功能只可运行一次)
//configPath: es的配置文件路径,目前默认为/opt/elasticsearch/config
//binPath:  es的脚本路径,目前默认为 /opt/vasystem/script
//indexPath: 目前默认为 /opt/vasystem/indexInit
func CreateOriginalCluster(configPath string, binPath string, indexPath string) (bool, error) {
   verIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
   verPort := "9200"
   sp := esutil.StopServer(binPath, "0.0.0.0", "9200")
   if sp == true {
      logger.Debug("binPath路径错误")
      return false, errors.New("binPath路径错误")
   }
   _, errC := esutil.VerifyCreated(configPath)
   if errC != nil {
      logger.Debug("errC:", errC)
      return false, errC
   }
   _, errI := esutil.InitYml(configPath)
   if errI != nil {
      logger.Debug("errI:", errI)
      return false, errI
   }
   _, errR := esutil.UpdateNodeRole(configPath, "master")
   if errR != nil {
      logger.Debug("errR:", errR)
      return false, errR
   }
   clusterName := GetClusterNameByVirtualIp(verIp)
   logger.Debugf("CreateCluster clusterName=%v, verIp=%v", clusterName, verIp)
   _, errCN := esutil.UpdateClusterName(configPath, clusterName)
   if errCN != nil {
      logger.Debug("errCN:", errCN)
      return false, errCN
   }
   _, errS := esutil.SetDiscoveryZenPingUnicastHosts(configPath, []string{verIp})
   if errS != nil {
      logger.Debug("errS:", errS)
      return false, errS
   }
   st := esutil.StartServer(binPath, verIp, verPort)
   if st == false {
      logger.Debug("binPath路径错误")
      return false, errors.New("binPath路径错误")
   }
   result := esutil.VerifyNodeServer(verIp, verPort, 20)
   if result == false {
      logger.Debug("服务启动超时")
      return false, errors.New("服务启动超时")
   }
   rIndexInit := esutil.InitIndex(indexPath)
   if rIndexInit == false {
      logger.Debug("索引初始化失败")
      return false, errors.New("索引初始化失败")
   }
   return true, nil
}
func GetClusterNameByVirtualIp(ip string) string {
   var clusterE = models.Cluster{
      VirtualIp: ip,
   }
   clusterInfo, err := clusterE.FindByVirtualIp()
   if err != nil {
      logger.Debugf("GetClusterName FindByVirtualIp err=%v", err)
      return clusterInfo.ClusterName
   }
   return clusterInfo.ClusterName
}
//加入集群(该节点将根据计算得出自己需要扮演的角色) dataPath为es的数据路径,默认在/data/disk1/elasticsearch
func AddCluster(configPath string, binPath string, ip string, port string, dataPath []string) (bool, error) {
   var errRet error
   start := time.Now()
   defer func(errRet error) {
      logger.Debugf("AddCluster finish time=%v, err=%v", time.Since(start), errRet)
   }(errRet)
   _, errRet = InitEsNode(dataPath, configPath)
   if errRet != nil {
      return false, errors.New("elasticsearch 初始化失败")
   }
   verIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
   verPort := "9200"
   localIp := verIp
   localPort := verPort
   sp := esutil.StopServer(binPath, localIp, localPort)
   if sp == true {
      logger.Debug("AddCluster binPath路径错误")
      return false, errors.New("binPath路径错误")
   }
   LocalRole := "slave"
   var nodesInfo []esutil.NodeInfo
   nodesInfo, errRet = esutil.GetClusterInfo(ip, port)
   if errRet != nil {
      logger.Debug("errG:", errRet)
      return false, errRet
   }
   TotalAllNodes := len(nodesInfo)
   if TotalAllNodes < 1 {
      logger.Debug("目标集群不存在")
      return false, errors.New("目标集群不存在")
   }
   TotalMasterNodes := 0
   hosts := make([]string, 0)
   allHosts := make([]string, 0)
   for _, mp := range nodesInfo {
      if mp.NodeRole == "master" {
         TotalMasterNodes = TotalMasterNodes + 1
         hosts = append(hosts, mp.NodeIp)
      }
      allHosts = append(allHosts, mp.NodeIp)
   }
   if TotalAllNodes/5 > TotalMasterNodes {
      LocalRole = "master"
      hosts = append(hosts, localIp)
      for _, mpk := range allHosts {
         url := "http://" + mpk + ":8888/data/api-v/es/node/synchronizeHosts"
         body := `{
         "binPath":"` + binPath + `",
          "configPath":"` + configPath + `",
          "hosts":["` + strings.Replace(strings.Trim(fmt.Sprint(hosts), "[]"), " ", "\",\"", -1) + `"]
         }`
         _, errRet := HttpRCT("POST", url, []byte(body), 10*time.Second)
         logger.Debugf("AddCluster url=%v, mpk=%v, errRet=%v", url, mpk, errRet)
         if errRet != nil {
            return false, errors.New(mpk + "节点同步失败")
         }
      }
   }
   _, errRet = esutil.UpdateNodeRole(configPath, LocalRole)
   if errRet != nil {
      logger.Debug("AddCluster errNR:", errRet)
      return false, errRet
   }
   clusterName := GetClusterNameByVirtualIp(ip)
   logger.Debugf("AddCluster clusterName=%v, ip=%v", clusterName, ip)
   _, errRet = esutil.UpdateClusterName(configPath, clusterName)
   if errRet != nil {
      logger.Debug("AddCluster errCN:", errRet)
      return false, errRet
   }
   _, errRet = esutil.SetDiscoveryZenPingUnicastHosts(configPath, hosts)
   if errRet != nil {
      logger.Debug("AddCluster errS:", errRet)
      return false, errRet
   }
   st := esutil.StartServer(binPath, localIp, localPort)
   if st == false {
      logger.Debug("AddCluster 服务启动超时")
      return false, errors.New("服务启动超时")
   }
   return true, errRet
}
//退出集群(从当前集群退出)
func ExitCluster(configPath string, binPath string) (bool, error) {
   var errRet error
   start := time.Now()
   defer func(errRet error) {
      logger.Debugf("Leave ExitCluster finish time=%v, err=%v", time.Since(start), errRet)
   }(errRet)
   verIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
   verPort := "9200"
   localIp := verIp
   localPort := verPort
   var hosts []string
   if hosts, errRet = esutil.GetDiscoveryZenPingUnicastHosts(configPath); errRet != nil {
      logger.Debug("Leave errH:", errRet)
      return false, errRet
   }
   for _, p := range hosts {
      if p == localIp {
         logger.Debug("Leave 主节点不允许退出集群")
         return false, errors.New("主节点不允许退出集群")
      }
   }
   var bEN bool
   if bEN, errRet = esutil.ExcludeNode(localIp, localPort); errRet != nil {
      logger.Debug("Leave errEN:", errRet)
      return false, errRet
   }
   sp := esutil.StopServer(binPath, localIp, localPort)
   if sp == true {
      logger.Debug("Leave binPath路径错误")
      return false, errors.New("binPath路径错误")
   }
   if _, errRet = esutil.SetDiscoveryZenPingUnicastHosts(configPath, nil); errRet != nil {
      logger.Debug("Leave errS:", errRet)
      return false, errRet
   }
   return bEN, nil
}
func HttpRCT(method string, url string, parama []byte, timeout time.Duration) (buf []byte, err error) {
   client := http.Client{
      Timeout: timeout,
   }
   logger.Debugf("HttpRCT method=%v, url=%v, parama=%v", method, url, string(parama))
   request, err := http.NewRequest(method, url, bytes.NewBuffer(parama))
   request.Header.Set("Content-type", "application/json")
   request.Header.Add("Authorization", "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjQ3ND"+
      "UwMjU5MjMsInVzZXIiOiJ7XCJpZFwiOlwiZTZjY2QzNmQtNGYxNi00NmZjLTg4ZDUtMDczNjU4NjZkMjA1XCIsXCJwZXJtaXNzaW"+
      "9uc1wiOltcInByb2R1Y3RNYW5nZTpwdWJsaXNoXCIsXCJjb2RlTWFuZ2U6dmlld1wiLFwiZGV2aWNlTWFuYWdlOmFkZFwiLFwiYW"+
      "RtaW5NYW5hZ2VcIixcIm9yZGVyTWFuZ2VcIixcImRldmljZU1hbmFnZTp2aWV3XCIsXCJwcm9kdWN0TWFuZ2U6YWRkXCIsXCJhZG"+
      "1pbk1hbmFnZTp2aWV3XCIsXCJjb2RlTWFuZ2U6YWRkXCIsXCJwcm9kdWN0TWFuZ2U6b2ZmU2FsZVwiLFwib3JkZXJNYW5nZTpjYW"+
      "5jZWxcIixcInByb2R1Y3RDZW50ZXI6ZG93bmxvYWRcIixcInByb2R1Y3RDZW50ZXI6YnV5XCIsXCJwcm9kdWN0TWFuZ2U6dmlld1"+
      "wiLFwiYXBpXCIsXCJob21lXCIsXCJvcmRlck1hbmdlOnBheVwiLFwiYWRtaW5NYW5hZ2U6YWRkXCIsXCJvcmRlck1hbmdlOmRvd2"+
      "5sb2FkXCIsXCJwcm9kdWN0Q2VudGVyXCIsXCJkZXZpY2VNYW5hZ2U6dW5iaW5kXCIsXCJvcmRlck1hbmdlOnZpZXdcIixcImFkbW"+
      "luTWFuYWdlOmVkaXRcIixcImRldmljZU1hbmFnZVwiLFwidmlwTWFuYWdlOmFkZFwiLFwidmlwTWFuYWdlOnZpZXdcIixcInByb2"+
      "R1Y3RDZW50ZXI6dmlld1wiLFwidmlwTWFuYWdlOmVkaXRcIixcInZpcE1hbmFnZVwiLFwicHJvZHVjdE1hbmdlOmVkaXRcIixcIm"+
      "NvZGVNYW5nZVwiLFwicHJvZHVjdE1hbmdlXCJdLFwidXNlcm5hbWVcIjpcImJhc2ljXCJ9In0.vwjAFkWuEyadRLvIOGK8LFE3Mj"+
      "pY3SQ7j6AlTXnQDG8")
   if err != nil {
      logger.Debug("build request err:", err)
      return nil, err
   }
   resp, err := client.Do(request)
   if err != nil {
      logger.Debug("request error: ", err)
      return nil, err
   }
   defer resp.Body.Close()
   body, err := ioutil.ReadAll(resp.Body)
   if err != nil {
      logger.Debug("read body err:", err)
      return nil, err
   }
   return body, nil
}
package service
import (
   "bytes"
   "errors"
   "fmt"
   "io/ioutil"
   "net/http"
   "strings"
   "time"
   "vamicro/config"
   "vamicro/system-service/models"
   "vamicro/system-service/sys"
   "vamicro/system-service/util"
   "basic.com/pubsub/esutil.git"
   "basic.com/valib/logger.git"
)
func InitEsNode(dataPath []string, configPath string) (bool, error) {
   for _, dp := range dataPath {
      path := strings.Split(dp, "/")
      if strings.Contains(path[len(path)-1], "elasticsearch") == false && strings.Contains(path[len(path)-2], "elasticsearch") == false {
         return false, errors.New(dp + "路径不正确")
      }
      util.CMDSC("rm -rf " + dp + "/data " + dp + "/logs")
   }
   _, err := esutil.SetDiscoveryZenPingUnicastHosts(configPath, []string{})
   if err != nil {
      return false, err
   }
   return true, nil
}
//创建集群(初创节点,初始节点,一个集群该功能只可运行一次)
//configPath: es的配置文件路径,目前默认为/opt/elasticsearch/config
//binPath:  es的脚本路径,目前默认为 /opt/vasystem/script
//indexPath: 目前默认为 /opt/vasystem/indexInit
func CreateOriginalCluster(configPath string, binPath string, indexPath string) (bool, error) {
   verIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
   verPort := "9200"
   sp := esutil.StopServer(binPath, "0.0.0.0", "9200")
   if sp == true {
      logger.Debug("binPath路径错误")
      return false, errors.New("binPath路径错误")
   }
   _, errC := esutil.VerifyCreated(configPath)
   if errC != nil {
      logger.Debug("errC:", errC)
      return false, errC
   }
   _, errI := esutil.InitYml(configPath)
   if errI != nil {
      logger.Debug("errI:", errI)
      return false, errI
   }
   _, errR := esutil.UpdateNodeRole(configPath, "master")
   if errR != nil {
      logger.Debug("errR:", errR)
      return false, errR
   }
   clusterName := GetClusterNameByVirtualIp(verIp)
   logger.Debugf("CreateCluster clusterName=%v, verIp=%v", clusterName, verIp)
   _, errCN := esutil.UpdateClusterName(configPath, clusterName)
   if errCN != nil {
      logger.Debug("errCN:", errCN)
      return false, errCN
   }
   _, errS := esutil.SetDiscoveryZenPingUnicastHosts(configPath, []string{verIp})
   if errS != nil {
      logger.Debug("errS:", errS)
      return false, errS
   }
   st := esutil.StartServer(binPath, verIp, verPort)
   if st == false {
      logger.Debug("binPath路径错误")
      return false, errors.New("binPath路径错误")
   }
   result := esutil.VerifyNodeServer(verIp, verPort, 20)
   if result == false {
      logger.Debug("服务启动超时")
      return false, errors.New("服务启动超时")
   }
   rIndexInit := esutil.InitIndex(indexPath)
   if rIndexInit == false {
      logger.Debug("索引初始化失败")
      return false, errors.New("索引初始化失败")
   }
   return true, nil
}
func GetClusterNameByVirtualIp(ip string) string {
   var clusterE = models.Cluster{
      VirtualIp: ip,
   }
   clusterInfo, err := clusterE.FindByVirtualIp()
   if err != nil {
      logger.Debugf("GetClusterName FindByVirtualIp err=%v", err)
      return clusterInfo.ClusterName
   }
   return clusterInfo.ClusterName
}
//加入集群(该节点将根据计算得出自己需要扮演的角色) dataPath为es的数据路径,默认在/data/disk1/elasticsearch
func AddCluster(configPath string, binPath string, ip string, port string, dataPath []string) (bool, error) {
   var errRet error
   start := time.Now()
   defer func(errRet error) {
      logger.Debugf("AddCluster finish time=%v, err=%v", time.Since(start), errRet)
   }(errRet)
   _, errRet = InitEsNode(dataPath, configPath)
   if errRet != nil {
      return false, errors.New("elasticsearch 初始化失败")
   }
   verIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
   verPort := "9200"
   localIp := verIp
   localPort := verPort
   sp := esutil.StopServer(binPath, localIp, localPort)
   if sp == true {
      logger.Debug("AddCluster binPath路径错误")
      return false, errors.New("binPath路径错误")
   }
   LocalRole := "slave"
   var nodesInfo []esutil.NodeInfo
   nodesInfo, errRet = esutil.GetClusterInfo(ip, port)
   if errRet != nil {
      logger.Debug("errG:", errRet)
      return false, errRet
   }
   TotalAllNodes := len(nodesInfo)
   if TotalAllNodes < 1 {
      logger.Debug("目标集群不存在")
      return false, errors.New("目标集群不存在")
   }
   TotalMasterNodes := 0
   hosts := make([]string, 0)
   allHosts := make([]string, 0)
   for _, mp := range nodesInfo {
      if mp.NodeRole == "master" {
         TotalMasterNodes = TotalMasterNodes + 1
         hosts = append(hosts, mp.NodeIp)
      }
      allHosts = append(allHosts, mp.NodeIp)
   }
   if TotalAllNodes/5 > TotalMasterNodes {
      LocalRole = "master"
      hosts = append(hosts, localIp)
      for _, mpk := range allHosts {
         url := "http://" + mpk + ":8888/data/api-v/es/node/synchronizeHosts"
         body := `{
         "binPath":"` + binPath + `",
          "configPath":"` + configPath + `",
          "hosts":["` + strings.Replace(strings.Trim(fmt.Sprint(hosts), "[]"), " ", "\",\"", -1) + `"]
         }`
         _, errRet := HttpRCT("POST", url, []byte(body), 10*time.Second)
         logger.Debugf("AddCluster url=%v, mpk=%v, errRet=%v", url, mpk, errRet)
         if errRet != nil {
            return false, errors.New(mpk + "节点同步失败")
         }
      }
   }
   _, errRet = esutil.UpdateNodeRole(configPath, LocalRole)
   if errRet != nil {
      logger.Debug("AddCluster errNR:", errRet)
      return false, errRet
   }
   clusterName := GetClusterNameByVirtualIp(ip)
   logger.Debugf("AddCluster clusterName=%v, ip=%v", clusterName, ip)
   _, errRet = esutil.UpdateClusterName(configPath, clusterName)
   if errRet != nil {
      logger.Debug("AddCluster errCN:", errRet)
      return false, errRet
   }
   _, errRet = esutil.SetDiscoveryZenPingUnicastHosts(configPath, hosts)
   if errRet != nil {
      logger.Debug("AddCluster errS:", errRet)
      return false, errRet
   }
   st := esutil.StartServer(binPath, localIp, localPort)
   if st == false {
      logger.Debug("AddCluster 服务启动超时")
      return false, errors.New("服务启动超时")
   }
   return true, errRet
}
//退出集群(从当前集群退出)
func ExitCluster(configPath string, binPath string) (bool, error) {
   var errRet error
   start := time.Now()
   defer func(errRet error) {
      logger.Debugf("Leave ExitCluster finish time=%v, err=%v", time.Since(start), errRet)
   }(errRet)
   verIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
   verPort := "9200"
   localIp := verIp
   localPort := verPort
   var hosts []string
   if hosts, errRet = esutil.GetDiscoveryZenPingUnicastHosts(configPath); errRet != nil {
      logger.Debug("Leave errH:", errRet)
      return false, errRet
   }
   for _, p := range hosts {
      if p == localIp {
         logger.Debug("Leave 主节点不允许退出集群")
         return false, errors.New("主节点不允许退出集群")
      }
   }
   var bEN bool
   if bEN, errRet = esutil.ExcludeNode(localIp, localPort); errRet != nil {
      logger.Debug("Leave errEN:", errRet)
      return false, errRet
   }
   sp := esutil.StopServer(binPath, localIp, localPort)
   if sp == true {
      logger.Debug("Leave binPath路径错误")
      return false, errors.New("binPath路径错误")
   }
   if _, errRet = esutil.SetDiscoveryZenPingUnicastHosts(configPath, nil); errRet != nil {
      logger.Debug("Leave errS:", errRet)
      return false, errRet
   }
   return bEN, nil
}
func HttpRCT(method string, url string, parama []byte, timeout time.Duration) (buf []byte, err error) {
   client := http.Client{
      Timeout: timeout,
   }
   logger.Debugf("HttpRCT method=%v, url=%v, parama=%v", method, url, string(parama))
   request, err := http.NewRequest(method, url, bytes.NewBuffer(parama))
   request.Header.Set("Content-type", "application/json")
   request.Header.Add("Authorization", "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjQ3ND"+
      "UwMjU5MjMsInVzZXIiOiJ7XCJpZFwiOlwiZTZjY2QzNmQtNGYxNi00NmZjLTg4ZDUtMDczNjU4NjZkMjA1XCIsXCJwZXJtaXNzaW"+
      "9uc1wiOltcInByb2R1Y3RNYW5nZTpwdWJsaXNoXCIsXCJjb2RlTWFuZ2U6dmlld1wiLFwiZGV2aWNlTWFuYWdlOmFkZFwiLFwiYW"+
      "RtaW5NYW5hZ2VcIixcIm9yZGVyTWFuZ2VcIixcImRldmljZU1hbmFnZTp2aWV3XCIsXCJwcm9kdWN0TWFuZ2U6YWRkXCIsXCJhZG"+
      "1pbk1hbmFnZTp2aWV3XCIsXCJjb2RlTWFuZ2U6YWRkXCIsXCJwcm9kdWN0TWFuZ2U6b2ZmU2FsZVwiLFwib3JkZXJNYW5nZTpjYW"+
      "5jZWxcIixcInByb2R1Y3RDZW50ZXI6ZG93bmxvYWRcIixcInByb2R1Y3RDZW50ZXI6YnV5XCIsXCJwcm9kdWN0TWFuZ2U6dmlld1"+
      "wiLFwiYXBpXCIsXCJob21lXCIsXCJvcmRlck1hbmdlOnBheVwiLFwiYWRtaW5NYW5hZ2U6YWRkXCIsXCJvcmRlck1hbmdlOmRvd2"+
      "5sb2FkXCIsXCJwcm9kdWN0Q2VudGVyXCIsXCJkZXZpY2VNYW5hZ2U6dW5iaW5kXCIsXCJvcmRlck1hbmdlOnZpZXdcIixcImFkbW"+
      "luTWFuYWdlOmVkaXRcIixcImRldmljZU1hbmFnZVwiLFwidmlwTWFuYWdlOmFkZFwiLFwidmlwTWFuYWdlOnZpZXdcIixcInByb2"+
      "R1Y3RDZW50ZXI6dmlld1wiLFwidmlwTWFuYWdlOmVkaXRcIixcInZpcE1hbmFnZVwiLFwicHJvZHVjdE1hbmdlOmVkaXRcIixcIm"+
      "NvZGVNYW5nZVwiLFwicHJvZHVjdE1hbmdlXCJdLFwidXNlcm5hbWVcIjpcImJhc2ljXCJ9In0.vwjAFkWuEyadRLvIOGK8LFE3Mj"+
      "pY3SQ7j6AlTXnQDG8")
   if err != nil {
      logger.Debug("build request err:", err)
      return nil, err
   }
   resp, err := client.Do(request)
   if err != nil {
      logger.Debug("request error: ", err)
      return nil, err
   }
   defer resp.Body.Close()
   body, err := ioutil.ReadAll(resp.Body)
   if err != nil {
      logger.Debug("read body err:", err)
      return nil, err
   }
   return body, nil
}