package controllers import ( "basic.com/pubsub/esutil.git" "basic.com/valib/bhomeclient.git" "bytes" "encoding/json" "fmt" "io/ioutil" "net/http" "strings" "time" "vamicro/config" "vamicro/system-service/sys" "vamicro/system-service/util" ) type TestControllers struct{} type NodeInfo struct { BinPath string `json:"binPath"` ConfigPath string `json:"configPath"` IndexPath string `json:"indexPath"` ScriptPath string `json:"scriptPath"` Ip string `json:"ip"` Port string `json:"port"` Peers []string `json:"peers"` } //创建集群(初创节点,初始节点,一个集群该功能只可运行一次) func (tc *TestControllers) CreateOriginalClusterT(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply { verIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) verPort := "9200" var ni NodeInfo c.BindJSON(&ni) configPath := ni.ConfigPath binPath := ni.BinPath fmt.Println("configPath: ", configPath) fmt.Println("binPath: ", binPath) fmt.Println("verIp: ", verIp) fmt.Println("verPort: ", verPort) sp := esutil.StopServer(binPath, "0.0.0.0", verPort) fmt.Println("sp: ", sp) if sp == true { return &bhomeclient.Reply{Msg: "binPath路径错误"} } _, errC := esutil.VerifyCreated(configPath) if errC != nil { fmt.Println("errC: ", errC) return &bhomeclient.Reply{Msg: fmt.Sprint("%s", errC)} } _, errI := esutil.InitYml(configPath) if errI != nil { fmt.Println("errI: ", errI) return &bhomeclient.Reply{Msg: fmt.Sprint("%s", errI)} } _, errR := esutil.UpdateNodeRole(configPath, "master") if errR != nil { fmt.Println("errR: ", errR) return &bhomeclient.Reply{Msg: fmt.Sprint("%s", errR)} } _, errS := esutil.SetDiscoveryZenPingUnicastHosts(configPath, []string{verIp}) if errS != nil { fmt.Println("errS: ", errS) return &bhomeclient.Reply{Msg: fmt.Sprint("%s", errS)} } st := esutil.StartServer(binPath, verIp, verPort) if st == false { fmt.Println("st: ", st) return &bhomeclient.Reply{Msg: "binPath路径错误"} } fmt.Println("resultfff: ") result := esutil.VerifyNodeServer("0.0.0.0", "9200", 20) fmt.Println("resultsss: ", result) if result == false { fmt.Println("result: ", result) return &bhomeclient.Reply{Msg: "服务启动超时"} } fmt.Println("resulteee: ", result) rIndexInit := esutil.InitIndex(ni.IndexPath) if rIndexInit == false { fmt.Println("索引初始化失败") return &bhomeclient.Reply{Msg: "索引初始化失败"} } return &bhomeclient.Reply{Success: true, Msg: "搞定!!!!!!!"} } //加入集群(该节点将根据计算得出自己需要扮演的角色) func (tc *TestControllers) AddClusterT(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply { var ni NodeInfo c.BindJSON(&ni) binPath := ni.BinPath configPath := ni.ConfigPath ip := ni.Ip port := ni.Port verIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) verPort := "9200" localIp := verIp localPort := verPort sp := esutil.StopServer(binPath, "0.0.0.0", "9200") if sp == true { fmt.Println("sp: ", sp) return &bhomeclient.Reply{Msg: "binPath路径错误"} } LocalRole := "slave" _, errC := esutil.VerifyCreated(configPath) if errC != nil { fmt.Println("errC: ", errC) return &bhomeclient.Reply{Msg: fmt.Sprint("%s", errC)} } NodesInfo, errG := esutil.GetClusterInfo(ip, port) if errG != nil { fmt.Println("errG: ", errG) return &bhomeclient.Reply{Msg: fmt.Sprint("%s", errG)} } TotalAllNodes := len(NodesInfo) if TotalAllNodes < 1 { fmt.Println("目标集群不存在") return &bhomeclient.Reply{Msg: "目标集群不存在"} } TotalMasterNodes := 0 hosts := make([]string, 0) allHosts := make([]string, 0) for _, mp := range NodesInfo { if mp.NodeRole == "master" { TotalMasterNodes = TotalMasterNodes + 1 hosts = append(hosts, mp.NodeIp) } allHosts = append(allHosts, mp.NodeIp) } if TotalAllNodes/5 > TotalMasterNodes { LocalRole = "master" hosts = append(hosts, localIp) } _, errNR := esutil.UpdateNodeRole(configPath, LocalRole) if errNR != nil { fmt.Println("errNR: ", errNR) return &bhomeclient.Reply{Msg: fmt.Sprint("%s", errNR)} } for _, mpk := range allHosts { url := "http://" + mpk + ":8888/data/api-v/es/node/synchronizeHosts" body := `{ "binPath":"` + binPath + `", "configPath":"` + configPath + `", "hosts":["` + strings.Join(hosts, ",") + `"] }` _, err := HttpRCT("POST", url, []byte(body)) if err != nil { fmt.Println("err: ", err) return &bhomeclient.Reply{Msg: mpk + "节点同步失败"} } } _, errS := esutil.SetDiscoveryZenPingUnicastHosts(configPath, hosts) if errS != nil { fmt.Println("errS: ", errS) return &bhomeclient.Reply{Msg: fmt.Sprint("%s", errS)} } st := esutil.StartServer(binPath, localIp, localPort) if st == false { fmt.Println("st: ", st) return &bhomeclient.Reply{Msg: "服务启动超时"} } return &bhomeclient.Reply{Success: true, Msg: "搞定!!!!!!!"} } //退出集群(从当前集群退出) func (tc *TestControllers) ExitClusterT(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply { var ni NodeInfo c.BindJSON(&ni) configPath := ni.ConfigPath binPath := ni.BinPath verIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) verPort := "9200" localIp := verIp localPort := verPort hosts, errH := esutil.GetDiscoveryZenPingUnicastHosts(configPath) if errH != nil { fmt.Println("errH: ", errH) return &bhomeclient.Reply{Msg: fmt.Sprint("%s", errH)} } for _, p := range hosts { if p == localIp { fmt.Println("主节点不允许退出集群") return &bhomeclient.Reply{Msg: "主节点不允许退出集群"} } } _, errEN := esutil.ExcludeNode(localIp, localPort) if errEN != nil { fmt.Println("errEN: ", errEN) return &bhomeclient.Reply{Msg: fmt.Sprint("%s", errEN)} } sp := esutil.StopServer(binPath, "0.0.0.0", "9200") if sp == true { fmt.Println("binPath路径错误") return &bhomeclient.Reply{Msg: "binPath路径错误"} } _, errS := esutil.SetDiscoveryZenPingUnicastHosts(configPath, nil) if errS != nil { return &bhomeclient.Reply{Msg: fmt.Sprint("%s", errS)} } return &bhomeclient.Reply{Success: true, Msg: "搞定!!!!!!!"} } func HttpRCT(method string, url string, parama []byte) (buf []byte, err error) { timeout := time.Duration(30 * time.Second) client := http.Client{ Timeout: timeout, } request, err := http.NewRequest(method, url, bytes.NewBuffer(parama)) request.Header.Set("Content-type", "application/json") request.Header.Add("Authorization", "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjQ3ND"+ "UwMjU5MjMsInVzZXIiOiJ7XCJpZFwiOlwiZTZjY2QzNmQtNGYxNi00NmZjLTg4ZDUtMDczNjU4NjZkMjA1XCIsXCJwZXJtaXNzaW"+ "9uc1wiOltcInByb2R1Y3RNYW5nZTpwdWJsaXNoXCIsXCJjb2RlTWFuZ2U6dmlld1wiLFwiZGV2aWNlTWFuYWdlOmFkZFwiLFwiYW"+ "RtaW5NYW5hZ2VcIixcIm9yZGVyTWFuZ2VcIixcImRldmljZU1hbmFnZTp2aWV3XCIsXCJwcm9kdWN0TWFuZ2U6YWRkXCIsXCJhZG"+ "1pbk1hbmFnZTp2aWV3XCIsXCJjb2RlTWFuZ2U6YWRkXCIsXCJwcm9kdWN0TWFuZ2U6b2ZmU2FsZVwiLFwib3JkZXJNYW5nZTpjYW"+ "5jZWxcIixcInByb2R1Y3RDZW50ZXI6ZG93bmxvYWRcIixcInByb2R1Y3RDZW50ZXI6YnV5XCIsXCJwcm9kdWN0TWFuZ2U6dmlld1"+ "wiLFwiYXBpXCIsXCJob21lXCIsXCJvcmRlck1hbmdlOnBheVwiLFwiYWRtaW5NYW5hZ2U6YWRkXCIsXCJvcmRlck1hbmdlOmRvd2"+ "5sb2FkXCIsXCJwcm9kdWN0Q2VudGVyXCIsXCJkZXZpY2VNYW5hZ2U6dW5iaW5kXCIsXCJvcmRlck1hbmdlOnZpZXdcIixcImFkbW"+ "luTWFuYWdlOmVkaXRcIixcImRldmljZU1hbmFnZVwiLFwidmlwTWFuYWdlOmFkZFwiLFwidmlwTWFuYWdlOnZpZXdcIixcInByb2"+ "R1Y3RDZW50ZXI6dmlld1wiLFwidmlwTWFuYWdlOmVkaXRcIixcInZpcE1hbmFnZVwiLFwicHJvZHVjdE1hbmdlOmVkaXRcIixcIm"+ "NvZGVNYW5nZVwiLFwicHJvZHVjdE1hbmdlXCJdLFwidXNlcm5hbWVcIjpcImJhc2ljXCJ9In0.vwjAFkWuEyadRLvIOGK8LFE3Mj"+ "pY3SQ7j6AlTXnQDG8") if err != nil { fmt.Println("build request fail !") return nil, err } resp, err := client.Do(request) if err != nil { fmt.Println("request error: ", err) return nil, err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { fmt.Println(err) return nil, err } return body, nil } func (tc *TestControllers) CreateServer(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply { var ni NodeInfo c.BindJSON(&ni) ip, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) scriptPath := ni.ScriptPath peers := make([]string, 0) peers = append(peers, ip+":6333") _, err := util.VerifyServer(ip) if err == nil { sp := util.StopServer(scriptPath) if sp == false { return &bhomeclient.Reply{Msg: "stop scriptPath 路径错误!"} } } sc := util.SetConfig(scriptPath, ip, peers, "000") if sc == false { return &bhomeclient.Reply{Msg: "set scriptPath 路径错误!"} } st := util.StartServer(scriptPath) if st == false { return &bhomeclient.Reply{Msg: "start scriptPath 路径错误!"} } time.Sleep(time.Second * 5) _, errF := util.VerifyServer(ip) if errF != nil { time.Sleep(time.Second * 5) _, errS := util.VerifyServer(ip) if errS != nil { return &bhomeclient.Reply{Msg: "启动超时"} } } return &bhomeclient.Reply{Success: true, Msg: "创建成功!"} } func (tc *TestControllers) AddServer(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply { var ni NodeInfo c.BindJSON(&ni) ip := ni.Ip localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) url := "http://" + ip + ":8888/data/api-v/swfs/getSWFSPeers" body := `{"scriptPath":"` + ni.ScriptPath + `"}` buf, err := HttpRCT("POST", url, []byte(body)) if err != nil { return &bhomeclient.Reply{Msg: "获取peers列表失败"} } var info interface{} json.Unmarshal(buf, &info) peers := info.(map[string]interface{})["data"].([]interface{}) if len(peers) < 1 { return &bhomeclient.Reply{Msg: "指定ip不存在集群!"} } for _, pc := range peers { if strings.Contains(pc.(string), localIp) == true { return &bhomeclient.Reply{Msg: "该节点已经在目标集群"} } } peers = append(peers, localIp+":6333") for _, pick := range peers { mIp := strings.Split(pick.(string), ":")[0] url := "http://" + mIp + ":8888/data/api-v/swfs/syncSWFSNode" body := `{"scriptPath":"` + ni.ScriptPath + `", "peers":["` + strings.Replace(strings.Trim(fmt.Sprint(peers), "[]"), " ", "\",\"", -1) + `"]}` _, err := HttpRCT("POST", url, []byte(body)) if err != nil { return &bhomeclient.Reply{Msg: mIp + "节点修改失败"} } } return &bhomeclient.Reply{Success: true, Msg: "加入成功!!!"} } func (tc *TestControllers) ExitServer(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply { var ni NodeInfo c.BindJSON(&ni) scriptPath := ni.ScriptPath localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) configInfo, err := util.GetConfig(scriptPath) if err != nil { return &bhomeclient.Reply{Msg: "获取peers列表失败"} } newPeers := make([]string, 0) for _, p := range configInfo.Peers { if strings.Split(p, ":")[0] == localIp { continue } newPeers = append(newPeers, p) } for _, pick := range newPeers { mIp := strings.Split(pick, ":")[0] url := "http://" + mIp + ":8888/data/api-v/swfs/syncSWFSNode/" body := `{"scriptPath":"` + ni.ConfigPath + `", "peers":["` + strings.Replace(strings.Trim(fmt.Sprint(newPeers), "[]"), " ", "\",\"", -1) + `"]}` _, err := HttpRCT("POST", url, []byte(body)) if err != nil { return &bhomeclient.Reply{Msg: mIp + "节点修改失败"} } } sc := util.SetConfig(scriptPath, "", []string{}, "000") if sc == false { return &bhomeclient.Reply{Msg: "scriptPath 路径错误!"} } _, errVS := util.VerifyServer(localIp) if errVS == nil { sp := util.StopServer(scriptPath) if sp == false { return &bhomeclient.Reply{Msg: "stop scriptPath 路径错误!"} } } return &bhomeclient.Reply{Success: true, Msg: "退出成功!!!"} }