| | |
| | | package service |
| | | |
| | | import ( |
| | | "encoding/json" |
| | | "errors" |
| | | "fmt" |
| | | "strings" |
| | | "time" |
| | | "vamicro/config" |
| | | "vamicro/system-service/sys" |
| | | "vamicro/system-service/util" |
| | | |
| | | "basic.com/valib/logger.git" |
| | | ) |
| | | |
| | | func InitSWFSNode(storePath []string, scriptPath string) (bool, error) { |
| | | for _, p := range storePath { |
| | | path := strings.Split(p, "/") |
| | | if len(path) >= 3 { |
| | | if strings.Contains(path[len(path)-2], "seaweedfs") == false && strings.Contains(path[len(path)-3], "seaweedfs") == false { |
| | | logger.Errorf("GetConfig InitSWFSNode 路径不正确 storePath=%v", storePath) |
| | | return false, errors.New(p + "路径不正确") |
| | | } |
| | | ra := strings.Split(p, "seaweedfs") |
| | | util.CMDSC("find " + ra[0] + "seaweedfs" + " -type f -name '*' -exec rm -rf {} \\;") |
| | | } else { |
| | | logger.Errorf("GetConfig InitSWFSNode storePath=%v config 中VolumePath配置错误,请检查", storePath) |
| | | return false, errors.New("config 中VolumePath配置错误,请检查") |
| | | } |
| | | |
| | | } |
| | | util.SetConfig(scriptPath, "", []string{}, "000") |
| | | return true, nil |
| | | } |
| | | |
| | | func CreateWeedfsServer(scriptPath string) (bool, error) { |
| | | logger.Debug("CreateWeedfsServer:") |
| | | ip, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) |
| | | peers := make([]string, 0) |
| | | peers = append(peers, ip+":6333") |
| | | _, err := util.VerifyServer(ip) |
| | | if err == nil { |
| | | sp := util.StopServer(scriptPath) |
| | | if sp == false { |
| | | return false, errors.New("stop scriptPath 路径错误!") |
| | | } |
| | | } |
| | | logger.Debug("GetConfig CreateWeedfsServer ip=%v, peers=%v", ip, peers) |
| | | sc := util.SetConfig(scriptPath, ip, peers, "000") |
| | | if sc == false { |
| | | return false, errors.New("binPath 路径错误!") |
| | | } |
| | | st := util.StartServer(scriptPath) |
| | | if st == false { |
| | | return false, errors.New("binPath 路径错误!") |
| | | } |
| | | time.Sleep(time.Second * 5) |
| | | _, errF := util.VerifyServer(ip) |
| | | if errF != nil { |
| | | time.Sleep(time.Second * 5) |
| | | _, errS := util.VerifyServer(ip) |
| | | if errS != nil { |
| | | return false, errors.New("启动超时") |
| | | } |
| | | } |
| | | return true, nil |
| | | } |
| | | |
| | | type WeedInfo struct { |
| | | ScriptPath string `json:"scriptPath"` |
| | | Peers []string `json:"peers"` |
| | | DefaultReplication string `json:"defaultReplication"` |
| | | } |
| | | |
| | | //storePath是weedfs的数据路径,默认是在/data/disk2/seaweedfs |
| | | func AddWeedfsServer(scriptPath string, ip string, storePath []string) (bool, error) { |
| | | var errRet error |
| | | start := time.Now() |
| | | defer func(errRet error) { |
| | | logger.Debugf("AddCluster AddWeedfsServer finish time=%v, err=%v", time.Since(start), errRet) |
| | | }(errRet) |
| | | |
| | | localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) |
| | | |
| | | _, errRet = InitSWFSNode(storePath, scriptPath) |
| | | if errRet != nil { |
| | | return false, errors.New("seaweedfs 初始化失败") |
| | | } |
| | | |
| | | url := "http://" + ip + ":8888/data/api-v/swfs/getSWFSPeers" |
| | | body := `{"scriptPath":"` + scriptPath + `"}` |
| | | var buf []byte |
| | | buf, errRet = HttpRCT("POST", url, []byte(body), 10*time.Second) |
| | | if errRet != nil { |
| | | return false, errors.New("获取peers列表失败") |
| | | } |
| | | var info interface{} |
| | | json.Unmarshal(buf, &info) |
| | | peers := info.(map[string]interface{})["data"].([]interface{}) |
| | | if len(peers) < 1 { |
| | | return false, errors.New("指定ip不存在集群!") |
| | | } |
| | | for _, pc := range peers { |
| | | if strings.Contains(pc.(string), localIp) == true { |
| | | logger.Errorf("AddCluster AddWeedfsServer localIp=%v, pc=%v, peers=%v 该节点已经在目标集群", localIp, pc.(string), peers) |
| | | return false, errors.New("该节点已经在目标集群") |
| | | } |
| | | } |
| | | peers = append(peers, localIp+":6333") |
| | | pArray := make([]string, 0) |
| | | for _, p := range peers { |
| | | sData := strings.Split(p.(string), ":") |
| | | pArray = append(pArray, fmt.Sprintf("%v:%v", sData[0], sData[1])) |
| | | } |
| | | |
| | | for _, pick := range peers { |
| | | mIp := strings.Split(pick.(string), ":")[0] |
| | | url := "http://" + mIp + ":8888/data/api-v/swfs/syncSWFSNode" |
| | | |
| | | dataBody := WeedInfo{ |
| | | ScriptPath: scriptPath, |
| | | Peers: pArray, |
| | | DefaultReplication: "001", |
| | | } |
| | | srcData, _ := json.Marshal(dataBody) |
| | | |
| | | // body := `{"scriptPath":"` + scriptPath + `", |
| | | // "peers":["` + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", "\",\"", -1) + `"], |
| | | // "defaultReplication":"001"}` |
| | | logger.Debugf("AddCluster AddWeedfsServer url=%v, srcData=%v", url, string(srcData)) |
| | | if _, errRet = HttpRCT("POST", url, srcData, 30*time.Second); errRet != nil { |
| | | return false, errors.New(mIp + "节点修改失败") |
| | | } |
| | | } |
| | | |
| | | logger.Debug("AddCluster AddWeedfsServer success ip=%v, errRet=%v", ip, errRet) |
| | | return true, errRet |
| | | } |
| | | |
| | | func ExitWeedfsServer(scriptPath string, isDel bool, storePath []string) (bool, error) { |
| | | var errRet error |
| | | start := time.Now() |
| | | defer func(errRet error) { |
| | | logger.Debugf("Leave ExitWeedfsServer finish time=%v, err=%v", time.Since(start), errRet) |
| | | }(errRet) |
| | | |
| | | localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) |
| | | var configInfo util.SWFSInfo |
| | | configInfo, errRet = util.GetConfig(scriptPath) |
| | | logger.Debugf("Leave ExitWeedfsServer GetConfig=%v, localIp=%v, err=%v", configInfo, localIp, errRet) |
| | | if errRet != nil { |
| | | return false, errRet |
| | | } |
| | | if len(configInfo.Peers) == 0 { |
| | | logger.Debug("Leave ExitWeedfsServer 当前节点未使用无须退出") |
| | | return true, nil |
| | | } |
| | | newPeers := make([]string, 0) |
| | | for _, p := range configInfo.Peers { |
| | | if strings.Split(p, ":")[0] == localIp { |
| | | continue |
| | | } |
| | | newPeers = append(newPeers, p) |
| | | } |
| | | defaultReplication := "001" |
| | | if len(newPeers) <= 1 { |
| | | defaultReplication = "000" |
| | | } |
| | | |
| | | logger.Debug("ExitWeedfsServer newPeers:", newPeers) |
| | | for _, pick := range newPeers { |
| | | logger.Debug("Leave ExitWeedfsServer pick:", pick) |
| | | mIp := strings.Split(pick, ":")[0] |
| | | url := "http://" + mIp + ":8888/data/api-v/swfs/syncSWFSNode" |
| | | |
| | | dataBody := WeedInfo{ |
| | | ScriptPath: scriptPath, |
| | | Peers: newPeers, |
| | | DefaultReplication: defaultReplication, |
| | | } |
| | | srcData, _ := json.Marshal(dataBody) |
| | | |
| | | // body := `{"scriptPath":"` + scriptPath + `", |
| | | // "peers":["` + strings.Replace(strings.Trim(fmt.Sprint(newPeers), "[]"), " ", "\",\"", -1) + `"], |
| | | // "defaultReplication":"` + defaultReplication + `"}` |
| | | _, errRet = HttpRCT("POST", url, srcData, 30*time.Second) |
| | | logger.Debugf("Leave ExitWeedfsServer HttpRCT url=%v, srcData=%v, errRet=%v", url, string(srcData), errRet) |
| | | if errRet != nil { |
| | | return false, errors.New(mIp + "节点修改失败") |
| | | } |
| | | } |
| | | |
| | | sc := util.SetConfig(scriptPath, "", []string{}, defaultReplication) |
| | | if sc == false { |
| | | return false, errors.New("scriptPath 路径错误!") |
| | | } |
| | | |
| | | if _, errRet = util.VerifyServer(localIp); errRet == nil { |
| | | sp := util.StopServer(scriptPath) |
| | | logger.Debugf("Leave ExitWeedfsServer StopServer sp=%v", sp) |
| | | if sp == false { |
| | | return false, errors.New("stop scriptPath 路径错误!") |
| | | } |
| | | } |
| | | if isDel == true { |
| | | for _, p := range storePath { |
| | | path := strings.Split(p, "/") |
| | | if strings.Contains(path[len(path)-1], "seaweedfs") == false && strings.Contains(path[len(path)-2], "seaweedfs") == false { |
| | | logger.Errorf("Leave ExitWeedfsServer storePath=%v 路径不正确 ", storePath) |
| | | return false, errors.New(p + "路径不正确") |
| | | } |
| | | ra := strings.Split(p, "seaweedfs") |
| | | |
| | | util.CMDSC("find " + ra[0] + "seaweedfs" + " -type f -name '*' -exec rm -rf {} \\;") |
| | | } |
| | | } |
| | | logger.Debugf("Leave ExitWeedfsServer success errRet=%v", errRet) |
| | | return true, errRet |
| | | } |
| | | package service
|
| | |
|
| | | import (
|
| | | "encoding/json"
|
| | | "errors"
|
| | | "fmt"
|
| | | "strings"
|
| | | "time"
|
| | | "vamicro/config"
|
| | | "vamicro/system-service/sys"
|
| | | "vamicro/system-service/util"
|
| | |
|
| | | "basic.com/valib/logger.git"
|
| | | )
|
| | |
|
| | | func InitSWFSNode(storePath []string, scriptPath string) (bool, error) {
|
| | | for _, p := range storePath {
|
| | | path := strings.Split(p, "/")
|
| | | if len(path) >= 3 {
|
| | | if strings.Contains(path[len(path)-2], "seaweedfs") == false && strings.Contains(path[len(path)-3], "seaweedfs") == false {
|
| | | logger.Errorf("GetConfig InitSWFSNode 路径不正确 storePath=%v", storePath)
|
| | | return false, errors.New(p + "路径不正确")
|
| | | }
|
| | | ra := strings.Split(p, "seaweedfs")
|
| | | util.CMDSC("find " + ra[0] + "seaweedfs" + " -type f -name '*' -exec rm -rf {} \\;")
|
| | | } else {
|
| | | logger.Errorf("GetConfig InitSWFSNode storePath=%v config 中VolumePath配置错误,请检查", storePath)
|
| | | return false, errors.New("config 中VolumePath配置错误,请检查")
|
| | | }
|
| | |
|
| | | }
|
| | | util.SetConfig(scriptPath, "", []string{}, "000")
|
| | | return true, nil
|
| | | }
|
| | |
|
| | | func CreateWeedfsServer(scriptPath string) (bool, error) {
|
| | | logger.Debug("CreateWeedfsServer:")
|
| | | ip, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
|
| | | peers := make([]string, 0)
|
| | | peers = append(peers, ip+":6333")
|
| | | _, err := util.VerifyServer(ip)
|
| | | if err == nil {
|
| | | sp := util.StopServer(scriptPath)
|
| | | if sp == false {
|
| | | return false, errors.New("stop scriptPath 路径错误!")
|
| | | }
|
| | | }
|
| | | logger.Debug("GetConfig CreateWeedfsServer ip=%v, peers=%v", ip, peers)
|
| | | sc := util.SetConfig(scriptPath, ip, peers, "000")
|
| | | if sc == false {
|
| | | return false, errors.New("binPath 路径错误!")
|
| | | }
|
| | | st := util.StartServer(scriptPath)
|
| | | if st == false {
|
| | | return false, errors.New("binPath 路径错误!")
|
| | | }
|
| | | time.Sleep(time.Second * 5)
|
| | | _, errF := util.VerifyServer(ip)
|
| | | if errF != nil {
|
| | | time.Sleep(time.Second * 5)
|
| | | _, errS := util.VerifyServer(ip)
|
| | | if errS != nil {
|
| | | return false, errors.New("启动超时")
|
| | | }
|
| | | }
|
| | | return true, nil
|
| | | }
|
| | |
|
| | | type WeedInfo struct {
|
| | | ScriptPath string `json:"scriptPath"`
|
| | | Peers []string `json:"peers"`
|
| | | DefaultReplication string `json:"defaultReplication"`
|
| | | }
|
| | |
|
| | | //storePath是weedfs的数据路径,默认是在/data/disk2/seaweedfs
|
| | | func AddWeedfsServer(scriptPath string, ip string, storePath []string) (bool, error) {
|
| | | var errRet error
|
| | | start := time.Now()
|
| | | defer func(errRet error) {
|
| | | logger.Debugf("AddCluster AddWeedfsServer finish time=%v, err=%v", time.Since(start), errRet)
|
| | | }(errRet)
|
| | |
|
| | | localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
|
| | |
|
| | | _, errRet = InitSWFSNode(storePath, scriptPath)
|
| | | if errRet != nil {
|
| | | return false, errors.New("seaweedfs 初始化失败")
|
| | | }
|
| | |
|
| | | url := "http://" + ip + ":8888/data/api-v/swfs/getSWFSPeers"
|
| | | body := `{"scriptPath":"` + scriptPath + `"}`
|
| | | var buf []byte
|
| | | buf, errRet = HttpRCT("POST", url, []byte(body), 10*time.Second)
|
| | | if errRet != nil {
|
| | | return false, errors.New("获取peers列表失败")
|
| | | }
|
| | | var info interface{}
|
| | | json.Unmarshal(buf, &info)
|
| | | peers := info.(map[string]interface{})["data"].([]interface{})
|
| | | if len(peers) < 1 {
|
| | | return false, errors.New("指定ip不存在集群!")
|
| | | }
|
| | | for _, pc := range peers {
|
| | | if strings.Contains(pc.(string), localIp) == true {
|
| | | logger.Errorf("AddCluster AddWeedfsServer localIp=%v, pc=%v, peers=%v 该节点已经在目标集群", localIp, pc.(string), peers)
|
| | | return false, errors.New("该节点已经在目标集群")
|
| | | }
|
| | | }
|
| | | peers = append(peers, localIp+":6333")
|
| | | pArray := make([]string, 0)
|
| | | for _, p := range peers {
|
| | | sData := strings.Split(p.(string), ":")
|
| | | pArray = append(pArray, fmt.Sprintf("%v:%v", sData[0], sData[1]))
|
| | | }
|
| | |
|
| | | for _, pick := range peers {
|
| | | mIp := strings.Split(pick.(string), ":")[0]
|
| | | url := "http://" + mIp + ":8888/data/api-v/swfs/syncSWFSNode"
|
| | |
|
| | | dataBody := WeedInfo{
|
| | | ScriptPath: scriptPath,
|
| | | Peers: pArray,
|
| | | DefaultReplication: "001",
|
| | | }
|
| | | srcData, _ := json.Marshal(dataBody)
|
| | |
|
| | | // body := `{"scriptPath":"` + scriptPath + `",
|
| | | // "peers":["` + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", "\",\"", -1) + `"],
|
| | | // "defaultReplication":"001"}`
|
| | | logger.Debugf("AddCluster AddWeedfsServer url=%v, srcData=%v", url, string(srcData))
|
| | | if _, errRet = HttpRCT("POST", url, srcData, 30*time.Second); errRet != nil {
|
| | | return false, errors.New(mIp + "节点修改失败")
|
| | | }
|
| | | }
|
| | |
|
| | | logger.Debug("AddCluster AddWeedfsServer success ip=%v, errRet=%v", ip, errRet)
|
| | | return true, errRet
|
| | | }
|
| | |
|
| | | func ExitWeedfsServer(scriptPath string, isDel bool, storePath []string) (bool, error) {
|
| | | var errRet error
|
| | | start := time.Now()
|
| | | defer func(errRet error) {
|
| | | logger.Debugf("Leave ExitWeedfsServer finish time=%v, err=%v", time.Since(start), errRet)
|
| | | }(errRet)
|
| | |
|
| | | localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
|
| | | var configInfo util.SWFSInfo
|
| | | configInfo, errRet = util.GetConfig(scriptPath)
|
| | | logger.Debugf("Leave ExitWeedfsServer GetConfig=%v, localIp=%v, err=%v", configInfo, localIp, errRet)
|
| | | if errRet != nil {
|
| | | return false, errRet
|
| | | }
|
| | | if len(configInfo.Peers) == 0 {
|
| | | logger.Debug("Leave ExitWeedfsServer 当前节点未使用无须退出")
|
| | | return true, nil
|
| | | }
|
| | | newPeers := make([]string, 0)
|
| | | for _, p := range configInfo.Peers {
|
| | | if strings.Split(p, ":")[0] == localIp {
|
| | | continue
|
| | | }
|
| | | newPeers = append(newPeers, p)
|
| | | }
|
| | | defaultReplication := "001"
|
| | | if len(newPeers) <= 1 {
|
| | | defaultReplication = "000"
|
| | | }
|
| | |
|
| | | logger.Debug("ExitWeedfsServer newPeers:", newPeers)
|
| | | for _, pick := range newPeers {
|
| | | logger.Debug("Leave ExitWeedfsServer pick:", pick)
|
| | | mIp := strings.Split(pick, ":")[0]
|
| | | url := "http://" + mIp + ":8888/data/api-v/swfs/syncSWFSNode"
|
| | |
|
| | | dataBody := WeedInfo{
|
| | | ScriptPath: scriptPath,
|
| | | Peers: newPeers,
|
| | | DefaultReplication: defaultReplication,
|
| | | }
|
| | | srcData, _ := json.Marshal(dataBody)
|
| | |
|
| | | // body := `{"scriptPath":"` + scriptPath + `",
|
| | | // "peers":["` + strings.Replace(strings.Trim(fmt.Sprint(newPeers), "[]"), " ", "\",\"", -1) + `"],
|
| | | // "defaultReplication":"` + defaultReplication + `"}`
|
| | | _, errRet = HttpRCT("POST", url, srcData, 30*time.Second)
|
| | | logger.Debugf("Leave ExitWeedfsServer HttpRCT url=%v, srcData=%v, errRet=%v", url, string(srcData), errRet)
|
| | | if errRet != nil {
|
| | | return false, errors.New(mIp + "节点修改失败")
|
| | | }
|
| | | }
|
| | |
|
| | | sc := util.SetConfig(scriptPath, "", []string{}, defaultReplication)
|
| | | if sc == false {
|
| | | return false, errors.New("scriptPath 路径错误!")
|
| | | }
|
| | |
|
| | | if _, errRet = util.VerifyServer(localIp); errRet == nil {
|
| | | sp := util.StopServer(scriptPath)
|
| | | logger.Debugf("Leave ExitWeedfsServer StopServer sp=%v", sp)
|
| | | if sp == false {
|
| | | return false, errors.New("stop scriptPath 路径错误!")
|
| | | }
|
| | | }
|
| | | if isDel == true {
|
| | | for _, p := range storePath {
|
| | | path := strings.Split(p, "/")
|
| | | if strings.Contains(path[len(path)-1], "seaweedfs") == false && strings.Contains(path[len(path)-2], "seaweedfs") == false {
|
| | | logger.Errorf("Leave ExitWeedfsServer storePath=%v 路径不正确 ", storePath)
|
| | | return false, errors.New(p + "路径不正确")
|
| | | }
|
| | | ra := strings.Split(p, "seaweedfs")
|
| | |
|
| | | util.CMDSC("find " + ra[0] + "seaweedfs" + " -type f -name '*' -exec rm -rf {} \\;")
|
| | | }
|
| | | }
|
| | | logger.Debugf("Leave ExitWeedfsServer success errRet=%v", errRet)
|
| | | return true, errRet
|
| | | }
|