package service import ( "encoding/json" "errors" "fmt" "strings" "time" "vamicro/config" "vamicro/system-service/sys" "vamicro/system-service/util" "basic.com/valib/logger.git" ) func InitSWFSNode(storePath []string, scriptPath string) (bool, error) { for _, p := range storePath { path := strings.Split(p, "/") if len(path) >= 3 { if strings.Contains(path[len(path)-2], "seaweedfs") == false && strings.Contains(path[len(path)-3], "seaweedfs") == false { logger.Errorf("GetConfig InitSWFSNode 路径不正确 storePath=%v", storePath) return false, errors.New(p + "路径不正确") } ra := strings.Split(p, "seaweedfs") util.CMDSC("find " + ra[0] + "seaweedfs" + " -type f -name '*' -exec rm -rf {} \\;") } else { logger.Errorf("GetConfig InitSWFSNode storePath=%v config 中VolumePath配置错误,请检查", storePath) return false, errors.New("config 中VolumePath配置错误,请检查") } } util.SetConfig(scriptPath, "", []string{}, "000") return true, nil } func CreateWeedfsServer(scriptPath string) (bool, error) { logger.Debug("CreateWeedfsServer:") ip, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) peers := make([]string, 0) peers = append(peers, ip+":6333") _, err := util.VerifyServer(ip) if err == nil { sp := util.StopServer(scriptPath) if sp == false { return false, errors.New("stop scriptPath 路径错误!") } } logger.Debug("GetConfig CreateWeedfsServer ip=%v, peers=%v", ip, peers) sc := util.SetConfig(scriptPath, ip, peers, "000") if sc == false { return false, errors.New("binPath 路径错误!") } st := util.StartServer(scriptPath) if st == false { return false, errors.New("binPath 路径错误!") } time.Sleep(time.Second * 5) _, errF := util.VerifyServer(ip) if errF != nil { time.Sleep(time.Second * 5) _, errS := util.VerifyServer(ip) if errS != nil { return false, errors.New("启动超时") } } return true, nil } type WeedInfo struct { ScriptPath string `json:"scriptPath"` Peers []string `json:"peers"` DefaultReplication string `json:"defaultReplication"` } //storePath是weedfs的数据路径,默认是在/data/disk2/seaweedfs func AddWeedfsServer(scriptPath string, ip string, storePath []string) (bool, error) { var errRet error start := time.Now() defer func(errRet error) { logger.Debugf("AddCluster AddWeedfsServer finish time=%v, err=%v", time.Since(start), errRet) }(errRet) localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) _, errRet = InitSWFSNode(storePath, scriptPath) if errRet != nil { return false, errors.New("seaweedfs 初始化失败") } url := "http://" + ip + ":8888/data/api-v/swfs/getSWFSPeers" body := `{"scriptPath":"` + scriptPath + `"}` var buf []byte buf, errRet = HttpRCT("POST", url, []byte(body), 10*time.Second) if errRet != nil { return false, errors.New("获取peers列表失败") } var info interface{} json.Unmarshal(buf, &info) peers := info.(map[string]interface{})["data"].([]interface{}) if len(peers) < 1 { return false, errors.New("指定ip不存在集群!") } for _, pc := range peers { if strings.Contains(pc.(string), localIp) == true { logger.Errorf("AddCluster AddWeedfsServer localIp=%v, pc=%v, peers=%v 该节点已经在目标集群", localIp, pc.(string), peers) return false, errors.New("该节点已经在目标集群") } } peers = append(peers, localIp+":6333") pArray := make([]string, 0) for _, p := range peers { sData := strings.Split(p.(string), ":") pArray = append(pArray, fmt.Sprintf("%v:%v", sData[0], sData[1])) } for _, pick := range peers { mIp := strings.Split(pick.(string), ":")[0] url := "http://" + mIp + ":8888/data/api-v/swfs/syncSWFSNode" dataBody := WeedInfo{ ScriptPath: scriptPath, Peers: pArray, DefaultReplication: "001", } srcData, _ := json.Marshal(dataBody) // body := `{"scriptPath":"` + scriptPath + `", // "peers":["` + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", "\",\"", -1) + `"], // "defaultReplication":"001"}` logger.Debugf("AddCluster AddWeedfsServer url=%v, srcData=%v", url, string(srcData)) if _, errRet = HttpRCT("POST", url, srcData, 30*time.Second); errRet != nil { return false, errors.New(mIp + "节点修改失败") } } logger.Debug("AddCluster AddWeedfsServer success ip=%v, errRet=%v", ip, errRet) return true, errRet } func ExitWeedfsServer(scriptPath string, isDel bool, storePath []string) (bool, error) { var errRet error start := time.Now() defer func(errRet error) { logger.Debugf("Leave ExitWeedfsServer finish time=%v, err=%v", time.Since(start), errRet) }(errRet) localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) var configInfo util.SWFSInfo configInfo, errRet = util.GetConfig(scriptPath) logger.Debugf("Leave ExitWeedfsServer GetConfig=%v, localIp=%v, err=%v", configInfo, localIp, errRet) if errRet != nil { return false, errRet } if len(configInfo.Peers) == 0 { logger.Debug("Leave ExitWeedfsServer 当前节点未使用无须退出") return true, nil } newPeers := make([]string, 0) for _, p := range configInfo.Peers { if strings.Split(p, ":")[0] == localIp { continue } newPeers = append(newPeers, p) } defaultReplication := "001" if len(newPeers) <= 1 { defaultReplication = "000" } logger.Debug("ExitWeedfsServer newPeers:", newPeers) for _, pick := range newPeers { logger.Debug("Leave ExitWeedfsServer pick:", pick) mIp := strings.Split(pick, ":")[0] url := "http://" + mIp + ":8888/data/api-v/swfs/syncSWFSNode" dataBody := WeedInfo{ ScriptPath: scriptPath, Peers: newPeers, DefaultReplication: defaultReplication, } srcData, _ := json.Marshal(dataBody) // body := `{"scriptPath":"` + scriptPath + `", // "peers":["` + strings.Replace(strings.Trim(fmt.Sprint(newPeers), "[]"), " ", "\",\"", -1) + `"], // "defaultReplication":"` + defaultReplication + `"}` _, errRet = HttpRCT("POST", url, srcData, 30*time.Second) logger.Debugf("Leave ExitWeedfsServer HttpRCT url=%v, srcData=%v, errRet=%v", url, string(srcData), errRet) if errRet != nil { return false, errors.New(mIp + "节点修改失败") } } sc := util.SetConfig(scriptPath, "", []string{}, defaultReplication) if sc == false { return false, errors.New("scriptPath 路径错误!") } if _, errRet = util.VerifyServer(localIp); errRet == nil { sp := util.StopServer(scriptPath) logger.Debugf("Leave ExitWeedfsServer StopServer sp=%v", sp) if sp == false { return false, errors.New("stop scriptPath 路径错误!") } } if isDel == true { for _, p := range storePath { path := strings.Split(p, "/") if strings.Contains(path[len(path)-1], "seaweedfs") == false && strings.Contains(path[len(path)-2], "seaweedfs") == false { logger.Errorf("Leave ExitWeedfsServer storePath=%v 路径不正确 ", storePath) return false, errors.New(p + "路径不正确") } ra := strings.Split(p, "seaweedfs") util.CMDSC("find " + ra[0] + "seaweedfs" + " -type f -name '*' -exec rm -rf {} \\;") } } logger.Debugf("Leave ExitWeedfsServer success errRet=%v", errRet) return true, errRet }