zhangzengfei
2023-09-05 63645d248c765244488cd34dbc1bb6528ca6b7c7
system-service/service/seaweedfsService.go
@@ -1,219 +1,219 @@
package service
import (
   "encoding/json"
   "errors"
   "fmt"
   "strings"
   "time"
   "vamicro/config"
   "vamicro/system-service/sys"
   "vamicro/system-service/util"
   "basic.com/valib/logger.git"
)
func InitSWFSNode(storePath []string, scriptPath string) (bool, error) {
   for _, p := range storePath {
      path := strings.Split(p, "/")
      if len(path) >= 3 {
         if strings.Contains(path[len(path)-2], "seaweedfs") == false && strings.Contains(path[len(path)-3], "seaweedfs") == false {
            logger.Errorf("GetConfig InitSWFSNode 路径不正确 storePath=%v", storePath)
            return false, errors.New(p + "路径不正确")
         }
         ra := strings.Split(p, "seaweedfs")
         util.CMDSC("find " + ra[0] + "seaweedfs" + " -type f -name '*' -exec rm -rf {} \\;")
      } else {
         logger.Errorf("GetConfig InitSWFSNode storePath=%v config 中VolumePath配置错误,请检查", storePath)
         return false, errors.New("config 中VolumePath配置错误,请检查")
      }
   }
   util.SetConfig(scriptPath, "", []string{}, "000")
   return true, nil
}
func CreateWeedfsServer(scriptPath string) (bool, error) {
   logger.Debug("CreateWeedfsServer:")
   ip, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
   peers := make([]string, 0)
   peers = append(peers, ip+":6333")
   _, err := util.VerifyServer(ip)
   if err == nil {
      sp := util.StopServer(scriptPath)
      if sp == false {
         return false, errors.New("stop scriptPath  路径错误!")
      }
   }
   logger.Debug("GetConfig CreateWeedfsServer ip=%v, peers=%v", ip, peers)
   sc := util.SetConfig(scriptPath, ip, peers, "000")
   if sc == false {
      return false, errors.New("binPath 路径错误!")
   }
   st := util.StartServer(scriptPath)
   if st == false {
      return false, errors.New("binPath 路径错误!")
   }
   time.Sleep(time.Second * 5)
   _, errF := util.VerifyServer(ip)
   if errF != nil {
      time.Sleep(time.Second * 5)
      _, errS := util.VerifyServer(ip)
      if errS != nil {
         return false, errors.New("启动超时")
      }
   }
   return true, nil
}
type WeedInfo struct {
   ScriptPath         string   `json:"scriptPath"`
   Peers              []string `json:"peers"`
   DefaultReplication string   `json:"defaultReplication"`
}
//storePath是weedfs的数据路径,默认是在/data/disk2/seaweedfs
func AddWeedfsServer(scriptPath string, ip string, storePath []string) (bool, error) {
   var errRet error
   start := time.Now()
   defer func(errRet error) {
      logger.Debugf("AddCluster AddWeedfsServer finish time=%v, err=%v", time.Since(start), errRet)
   }(errRet)
   localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
   _, errRet = InitSWFSNode(storePath, scriptPath)
   if errRet != nil {
      return false, errors.New("seaweedfs 初始化失败")
   }
   url := "http://" + ip + ":8888/data/api-v/swfs/getSWFSPeers"
   body := `{"scriptPath":"` + scriptPath + `"}`
   var buf []byte
   buf, errRet = HttpRCT("POST", url, []byte(body), 10*time.Second)
   if errRet != nil {
      return false, errors.New("获取peers列表失败")
   }
   var info interface{}
   json.Unmarshal(buf, &info)
   peers := info.(map[string]interface{})["data"].([]interface{})
   if len(peers) < 1 {
      return false, errors.New("指定ip不存在集群!")
   }
   for _, pc := range peers {
      if strings.Contains(pc.(string), localIp) == true {
         logger.Errorf("AddCluster AddWeedfsServer localIp=%v, pc=%v, peers=%v 该节点已经在目标集群", localIp, pc.(string), peers)
         return false, errors.New("该节点已经在目标集群")
      }
   }
   peers = append(peers, localIp+":6333")
   pArray := make([]string, 0)
   for _, p := range peers {
      sData := strings.Split(p.(string), ":")
      pArray = append(pArray, fmt.Sprintf("%v:%v", sData[0], sData[1]))
   }
   for _, pick := range peers {
      mIp := strings.Split(pick.(string), ":")[0]
      url := "http://" + mIp + ":8888/data/api-v/swfs/syncSWFSNode"
      dataBody := WeedInfo{
         ScriptPath:         scriptPath,
         Peers:              pArray,
         DefaultReplication: "001",
      }
      srcData, _ := json.Marshal(dataBody)
      // body := `{"scriptPath":"` + scriptPath + `",
      //            "peers":["` + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", "\",\"", -1) + `"],
      //          "defaultReplication":"001"}`
      logger.Debugf("AddCluster AddWeedfsServer url=%v, srcData=%v", url, string(srcData))
      if _, errRet = HttpRCT("POST", url, srcData, 30*time.Second); errRet != nil {
         return false, errors.New(mIp + "节点修改失败")
      }
   }
   logger.Debug("AddCluster AddWeedfsServer success ip=%v, errRet=%v", ip, errRet)
   return true, errRet
}
func ExitWeedfsServer(scriptPath string, isDel bool, storePath []string) (bool, error) {
   var errRet error
   start := time.Now()
   defer func(errRet error) {
      logger.Debugf("Leave ExitWeedfsServer finish time=%v, err=%v", time.Since(start), errRet)
   }(errRet)
   localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
   var configInfo util.SWFSInfo
   configInfo, errRet = util.GetConfig(scriptPath)
   logger.Debugf("Leave ExitWeedfsServer GetConfig=%v, localIp=%v, err=%v", configInfo, localIp, errRet)
   if errRet != nil {
      return false, errRet
   }
   if len(configInfo.Peers) == 0 {
      logger.Debug("Leave ExitWeedfsServer 当前节点未使用无须退出")
      return true, nil
   }
   newPeers := make([]string, 0)
   for _, p := range configInfo.Peers {
      if strings.Split(p, ":")[0] == localIp {
         continue
      }
      newPeers = append(newPeers, p)
   }
   defaultReplication := "001"
   if len(newPeers) <= 1 {
      defaultReplication = "000"
   }
   logger.Debug("ExitWeedfsServer newPeers:", newPeers)
   for _, pick := range newPeers {
      logger.Debug("Leave ExitWeedfsServer pick:", pick)
      mIp := strings.Split(pick, ":")[0]
      url := "http://" + mIp + ":8888/data/api-v/swfs/syncSWFSNode"
      dataBody := WeedInfo{
         ScriptPath:         scriptPath,
         Peers:              newPeers,
         DefaultReplication: defaultReplication,
      }
      srcData, _ := json.Marshal(dataBody)
      // body := `{"scriptPath":"` + scriptPath + `",
      //            "peers":["` + strings.Replace(strings.Trim(fmt.Sprint(newPeers), "[]"), " ", "\",\"", -1) + `"],
      //            "defaultReplication":"` + defaultReplication + `"}`
      _, errRet = HttpRCT("POST", url, srcData, 30*time.Second)
      logger.Debugf("Leave ExitWeedfsServer HttpRCT url=%v, srcData=%v, errRet=%v", url, string(srcData), errRet)
      if errRet != nil {
         return false, errors.New(mIp + "节点修改失败")
      }
   }
   sc := util.SetConfig(scriptPath, "", []string{}, defaultReplication)
   if sc == false {
      return false, errors.New("scriptPath  路径错误!")
   }
   if _, errRet = util.VerifyServer(localIp); errRet == nil {
      sp := util.StopServer(scriptPath)
      logger.Debugf("Leave ExitWeedfsServer StopServer sp=%v", sp)
      if sp == false {
         return false, errors.New("stop scriptPath  路径错误!")
      }
   }
   if isDel == true {
      for _, p := range storePath {
         path := strings.Split(p, "/")
         if strings.Contains(path[len(path)-1], "seaweedfs") == false && strings.Contains(path[len(path)-2], "seaweedfs") == false {
            logger.Errorf("Leave ExitWeedfsServer storePath=%v 路径不正确 ", storePath)
            return false, errors.New(p + "路径不正确")
         }
         ra := strings.Split(p, "seaweedfs")
         util.CMDSC("find " + ra[0] + "seaweedfs" + " -type f -name '*' -exec rm -rf {} \\;")
      }
   }
   logger.Debugf("Leave ExitWeedfsServer success errRet=%v", errRet)
   return true, errRet
}
package service
import (
   "encoding/json"
   "errors"
   "fmt"
   "strings"
   "time"
   "vamicro/config"
   "vamicro/system-service/sys"
   "vamicro/system-service/util"
   "basic.com/valib/logger.git"
)
func InitSWFSNode(storePath []string, scriptPath string) (bool, error) {
   for _, p := range storePath {
      path := strings.Split(p, "/")
      if len(path) >= 3 {
         if strings.Contains(path[len(path)-2], "seaweedfs") == false && strings.Contains(path[len(path)-3], "seaweedfs") == false {
            logger.Errorf("GetConfig InitSWFSNode 路径不正确 storePath=%v", storePath)
            return false, errors.New(p + "路径不正确")
         }
         ra := strings.Split(p, "seaweedfs")
         util.CMDSC("find " + ra[0] + "seaweedfs" + " -type f -name '*' -exec rm -rf {} \\;")
      } else {
         logger.Errorf("GetConfig InitSWFSNode storePath=%v config 中VolumePath配置错误,请检查", storePath)
         return false, errors.New("config 中VolumePath配置错误,请检查")
      }
   }
   util.SetConfig(scriptPath, "", []string{}, "000")
   return true, nil
}
func CreateWeedfsServer(scriptPath string) (bool, error) {
   logger.Debug("CreateWeedfsServer:")
   ip, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
   peers := make([]string, 0)
   peers = append(peers, ip+":6333")
   _, err := util.VerifyServer(ip)
   if err == nil {
      sp := util.StopServer(scriptPath)
      if sp == false {
         return false, errors.New("stop scriptPath  路径错误!")
      }
   }
   logger.Debug("GetConfig CreateWeedfsServer ip=%v, peers=%v", ip, peers)
   sc := util.SetConfig(scriptPath, ip, peers, "000")
   if sc == false {
      return false, errors.New("binPath 路径错误!")
   }
   st := util.StartServer(scriptPath)
   if st == false {
      return false, errors.New("binPath 路径错误!")
   }
   time.Sleep(time.Second * 5)
   _, errF := util.VerifyServer(ip)
   if errF != nil {
      time.Sleep(time.Second * 5)
      _, errS := util.VerifyServer(ip)
      if errS != nil {
         return false, errors.New("启动超时")
      }
   }
   return true, nil
}
type WeedInfo struct {
   ScriptPath         string   `json:"scriptPath"`
   Peers              []string `json:"peers"`
   DefaultReplication string   `json:"defaultReplication"`
}
//storePath是weedfs的数据路径,默认是在/data/disk2/seaweedfs
func AddWeedfsServer(scriptPath string, ip string, storePath []string) (bool, error) {
   var errRet error
   start := time.Now()
   defer func(errRet error) {
      logger.Debugf("AddCluster AddWeedfsServer finish time=%v, err=%v", time.Since(start), errRet)
   }(errRet)
   localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
   _, errRet = InitSWFSNode(storePath, scriptPath)
   if errRet != nil {
      return false, errors.New("seaweedfs 初始化失败")
   }
   url := "http://" + ip + ":8888/data/api-v/swfs/getSWFSPeers"
   body := `{"scriptPath":"` + scriptPath + `"}`
   var buf []byte
   buf, errRet = HttpRCT("POST", url, []byte(body), 10*time.Second)
   if errRet != nil {
      return false, errors.New("获取peers列表失败")
   }
   var info interface{}
   json.Unmarshal(buf, &info)
   peers := info.(map[string]interface{})["data"].([]interface{})
   if len(peers) < 1 {
      return false, errors.New("指定ip不存在集群!")
   }
   for _, pc := range peers {
      if strings.Contains(pc.(string), localIp) == true {
         logger.Errorf("AddCluster AddWeedfsServer localIp=%v, pc=%v, peers=%v 该节点已经在目标集群", localIp, pc.(string), peers)
         return false, errors.New("该节点已经在目标集群")
      }
   }
   peers = append(peers, localIp+":6333")
   pArray := make([]string, 0)
   for _, p := range peers {
      sData := strings.Split(p.(string), ":")
      pArray = append(pArray, fmt.Sprintf("%v:%v", sData[0], sData[1]))
   }
   for _, pick := range peers {
      mIp := strings.Split(pick.(string), ":")[0]
      url := "http://" + mIp + ":8888/data/api-v/swfs/syncSWFSNode"
      dataBody := WeedInfo{
         ScriptPath:         scriptPath,
         Peers:              pArray,
         DefaultReplication: "001",
      }
      srcData, _ := json.Marshal(dataBody)
      // body := `{"scriptPath":"` + scriptPath + `",
      //            "peers":["` + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", "\",\"", -1) + `"],
      //          "defaultReplication":"001"}`
      logger.Debugf("AddCluster AddWeedfsServer url=%v, srcData=%v", url, string(srcData))
      if _, errRet = HttpRCT("POST", url, srcData, 30*time.Second); errRet != nil {
         return false, errors.New(mIp + "节点修改失败")
      }
   }
   logger.Debug("AddCluster AddWeedfsServer success ip=%v, errRet=%v", ip, errRet)
   return true, errRet
}
func ExitWeedfsServer(scriptPath string, isDel bool, storePath []string) (bool, error) {
   var errRet error
   start := time.Now()
   defer func(errRet error) {
      logger.Debugf("Leave ExitWeedfsServer finish time=%v, err=%v", time.Since(start), errRet)
   }(errRet)
   localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
   var configInfo util.SWFSInfo
   configInfo, errRet = util.GetConfig(scriptPath)
   logger.Debugf("Leave ExitWeedfsServer GetConfig=%v, localIp=%v, err=%v", configInfo, localIp, errRet)
   if errRet != nil {
      return false, errRet
   }
   if len(configInfo.Peers) == 0 {
      logger.Debug("Leave ExitWeedfsServer 当前节点未使用无须退出")
      return true, nil
   }
   newPeers := make([]string, 0)
   for _, p := range configInfo.Peers {
      if strings.Split(p, ":")[0] == localIp {
         continue
      }
      newPeers = append(newPeers, p)
   }
   defaultReplication := "001"
   if len(newPeers) <= 1 {
      defaultReplication = "000"
   }
   logger.Debug("ExitWeedfsServer newPeers:", newPeers)
   for _, pick := range newPeers {
      logger.Debug("Leave ExitWeedfsServer pick:", pick)
      mIp := strings.Split(pick, ":")[0]
      url := "http://" + mIp + ":8888/data/api-v/swfs/syncSWFSNode"
      dataBody := WeedInfo{
         ScriptPath:         scriptPath,
         Peers:              newPeers,
         DefaultReplication: defaultReplication,
      }
      srcData, _ := json.Marshal(dataBody)
      // body := `{"scriptPath":"` + scriptPath + `",
      //            "peers":["` + strings.Replace(strings.Trim(fmt.Sprint(newPeers), "[]"), " ", "\",\"", -1) + `"],
      //            "defaultReplication":"` + defaultReplication + `"}`
      _, errRet = HttpRCT("POST", url, srcData, 30*time.Second)
      logger.Debugf("Leave ExitWeedfsServer HttpRCT url=%v, srcData=%v, errRet=%v", url, string(srcData), errRet)
      if errRet != nil {
         return false, errors.New(mIp + "节点修改失败")
      }
   }
   sc := util.SetConfig(scriptPath, "", []string{}, defaultReplication)
   if sc == false {
      return false, errors.New("scriptPath  路径错误!")
   }
   if _, errRet = util.VerifyServer(localIp); errRet == nil {
      sp := util.StopServer(scriptPath)
      logger.Debugf("Leave ExitWeedfsServer StopServer sp=%v", sp)
      if sp == false {
         return false, errors.New("stop scriptPath  路径错误!")
      }
   }
   if isDel == true {
      for _, p := range storePath {
         path := strings.Split(p, "/")
         if strings.Contains(path[len(path)-1], "seaweedfs") == false && strings.Contains(path[len(path)-2], "seaweedfs") == false {
            logger.Errorf("Leave ExitWeedfsServer storePath=%v 路径不正确 ", storePath)
            return false, errors.New(p + "路径不正确")
         }
         ra := strings.Split(p, "seaweedfs")
         util.CMDSC("find " + ra[0] + "seaweedfs" + " -type f -name '*' -exec rm -rf {} \\;")
      }
   }
   logger.Debugf("Leave ExitWeedfsServer success errRet=%v", errRet)
   return true, errRet
}